• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.91
/tools/taos-tools/src/benchInsert.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15
#include "wrapDb.h"
16
#include <benchData.h>
17
#include <benchInsertMix.h>
18

19
static int32_t stmt2BindAndSubmit(
20
        threadInfo *pThreadInfo,
21
        SChildTable *childTbl,
22
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
23
        int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w);
24
TAOS_STMT2* initStmt2(TAOS* taos, bool single);
25

26
#define FREE_PIDS_INFOS_RETURN_MINUS_1()            \
27
    do {                                            \
28
        tmfree(pids);                               \
29
        tmfree(infos);                              \
30
        return -1;                                  \
31
    } while (0)
32

33
#define FREE_RESOURCE()                             \
34
    do {                                            \
35
        if (pThreadInfo->conn)                      \
36
            closeBenchConn(pThreadInfo->conn);      \
37
        benchArrayDestroy(pThreadInfo->delayList);  \
38
        tmfree(pids);                               \
39
        tmfree(infos);                              \
40
    } while (0)                                     \
41

42
// REST
43
static int getSuperTableFromServerRest(
14✔
44
    SDataBase* database, SSuperTable* stbInfo, char *command) {
45

46
    // TODO(zero): it will create super table based on this error code.
47
    return TSDB_CODE_NOT_FOUND;
14✔
48
    // TODO(me): finish full implementation
49
#if 0
50
    int sockfd = createSockFd();
51
    if (sockfd < 0) {
52
        return -1;
53
    }
54

55
    int code = postProcessSql(command,
56
                         database->dbName,
57
                         database->precision,
58
                         REST_IFACE,
59
                         0,
60
                         g_arguments->port,
61
                         false,
62
                         sockfd,
63
                         NULL);
64

65
    destroySockFd(sockfd);
66
#endif   // 0
67
}
68

69
static int getSuperTableFromServerTaosc(
36,347✔
70
        SDataBase *database, SSuperTable *stbInfo, char *command) {
71
    TAOS_RES *res;
72
    TAOS_ROW row = NULL;
36,347✔
73
    SBenchConn *conn = initBenchConn(database->dbName);
36,347✔
74
    if (NULL == conn) {
36,347✔
75
        return TSDB_CODE_FAILED;
×
76
    }
77

78
    res = taos_query(conn->taos, command);
36,347✔
79
    int32_t code = taos_errno(res);
36,347✔
80
    if (code != 0) {
36,347✔
81
        infoPrint("stable %s does not exist, will create one\n",
36,145✔
82
                  stbInfo->stbName);
83
        closeBenchConn(conn);
36,145✔
84
        return TSDB_CODE_NOT_FOUND;
36,145✔
85
    }
86
    infoPrint("find stable<%s>, will get meta data from server\n",
202✔
87
              stbInfo->stbName);
88

89
    // Check if the the existing super table matches exactly with the definitions in the JSON file.
90
    // If a hash table were used, the time complexity would be only O(n).
91
    // But taosBenchmark does not incorporate a hash table, the time complexity of the loop traversal is O(n^2).
92
    bool isTitleRow = true;
202✔
93
    uint32_t tag_count = 0;
202✔
94
    uint32_t col_count = 0;
202✔
95

96
    int fieldsNum = taos_num_fields(res);
202✔
97
    TAOS_FIELD_E* fields = taos_fetch_fields_e(res);
202✔
98

99
    if (fieldsNum < TSDB_MAX_DESCRIBE_METRIC || !fields) {
202✔
100
        errorPrint("%s", "failed to fetch fields\n");
×
101
        taos_free_result(res);
×
102
        closeBenchConn(conn);
×
103
        return TSDB_CODE_FAILED;
×
104
    }
105

106
    while ((row = taos_fetch_row(res)) != NULL) {
1,227✔
107
        if (isTitleRow) {
1,025✔
108
            isTitleRow = false;
202✔
109
            continue;
202✔
110
        }
111
        int32_t *lengths = taos_fetch_lengths(res);
823✔
112
        if (lengths == NULL) {
823✔
113
            errorPrint("%s", "failed to execute taos_fetch_length\n");
×
114
            taos_free_result(res);
×
115
            closeBenchConn(conn);
×
116
            return TSDB_CODE_FAILED;
×
117
        }
118
        if (strncasecmp((char *) row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "tag",
823✔
119
                        strlen("tag")) == 0) {
120
            if (stbInfo->tags == NULL || stbInfo->tags->size == 0 || tag_count >= stbInfo->tags->size) {
392✔
121
                errorPrint("%s", "existing stable tag mismatched with that defined in JSON\n");
×
122
                taos_free_result(res);
×
123
                closeBenchConn(conn);
×
124
                return TSDB_CODE_FAILED;
×
125
            }
126
            uint8_t tagType = convertStringToDatatype(
392✔
127
                    (char *) row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
392✔
128
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX], &(fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].precision));
392✔
129
            char *tagName = (char *) row[TSDB_DESCRIBE_METRIC_FIELD_INDEX];
392✔
130
            if (!searchBArray(stbInfo->tags, tagName,
392✔
131
                              lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX], tagType)) {
132
                errorPrint("existing stable tag:%s not found in JSON tags.\n", tagName);
×
133
                taos_free_result(res);
×
134
                closeBenchConn(conn);
×
135
                return TSDB_CODE_FAILED;
×
136
            }
137
            tag_count += 1;
392✔
138
        } else {
139
            if (stbInfo->cols == NULL || stbInfo->cols->size == 0 || col_count >= stbInfo->cols->size) {
431✔
140
                errorPrint("%s", "existing stable column mismatched with that defined in JSON\n");
×
141
                taos_free_result(res);
×
142
                closeBenchConn(conn);
×
143
                return TSDB_CODE_FAILED;
×
144
            }
145
            uint8_t colType = convertStringToDatatype(
431✔
146
                    (char *) row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
431✔
147
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX], &(fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].precision));
431✔
148
            char * colName = (char *) row[TSDB_DESCRIBE_METRIC_FIELD_INDEX];
431✔
149
            if (!searchBArray(stbInfo->cols, colName,
431✔
150
                              lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX], colType)) {
151
                errorPrint("existing stable col:%s not found in JSON cols.\n", colName);
×
152
                taos_free_result(res);
×
153
                closeBenchConn(conn);
×
154
                return TSDB_CODE_FAILED;
×
155
            }
156
            col_count += 1;
431✔
157
        }
158
    }  // end while
159
    taos_free_result(res);
202✔
160
    closeBenchConn(conn);
202✔
161
    if (tag_count != stbInfo->tags->size) {
202✔
162
        errorPrint("%s", "existing stable tag mismatched with that defined in JSON\n");
×
163
        return TSDB_CODE_FAILED;
×
164
    }
165

166
    if (col_count != stbInfo->cols->size) {
202✔
167
        errorPrint("%s", "existing stable column mismatched with that defined in JSON\n");
×
168
        return TSDB_CODE_FAILED;
×
169
    }
170

171
    return TSDB_CODE_SUCCESS;
202✔
172
}
173

174

175
static int getSuperTableFromServer(SDataBase* database, SSuperTable* stbInfo) {
36,361✔
176
    int ret = 0;
36,361✔
177
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
36,361✔
178
    (void)snprintf(command, SHORT_1K_SQL_BUFF_LEN,
36,361✔
179
             "DESCRIBE `%s`.`%s`", database->dbName,
180
             stbInfo->stbName);
181

182
    if (REST_IFACE == stbInfo->iface) {
36,361✔
183
        ret = getSuperTableFromServerRest(database, stbInfo, command);
14✔
184
    } else {
185
        ret = getSuperTableFromServerTaosc(database, stbInfo, command);
36,347✔
186
    }
187

188
    return ret;
36,361✔
189
}
190

191
static int queryDbExec(SDataBase *database,
37,437✔
192
                       SSuperTable *stbInfo, char *command) {
193
    int ret = 0;
37,437✔
194
    if (isRest(stbInfo->iface)) {
37,437✔
195
        if (0 != convertServAddr(stbInfo->iface, false, 1)) {
14✔
196
            errorPrint("%s", "Failed to convert server address\n");
×
197
            return -1;
×
198
        }
199
        int sockfd = createSockFd();
14✔
200
        if (sockfd < 0) {
14✔
201
            ret = -1;
×
202
        } else {
203
            ret = queryDbExecRest(command,
14✔
204
                              database->dbName,
205
                              database->precision,
206
                              stbInfo->iface,
14✔
207
                              stbInfo->lineProtocol,
14✔
208
                              stbInfo->tcpTransfer,
14✔
209
                              sockfd);
210
            destroySockFd(sockfd);
14✔
211
        }
212
    } else {
213
        SBenchConn* conn = initBenchConn(database->dbName);
37,423✔
214
        if (NULL == conn) {
37,423✔
215
            ret = -1;
×
216
        } else {
217
            ret = queryDbExecCall(conn, command);
37,423✔
218
            int32_t trying = g_arguments->keep_trying;
37,423✔
219
            while (ret && trying) {
37,423✔
220
                infoPrint("will sleep %"PRIu32" milliseconds then re-execute command: %s\n",
×
221
                          g_arguments->trying_interval, command);
222
                toolsMsleep(g_arguments->trying_interval);
×
223
                ret = queryDbExecCall(conn, command);
×
224
                if (trying != -1) {
×
225
                    trying--;
×
226
                }
227
            }
228
            if (0 != ret) {
37,423✔
229
                ret = -1;
×
230
            }
231
            closeBenchConn(conn);
37,423✔
232
        }
233
    }
234

235
    return ret;
37,437✔
236
}
237

238
int getCompressStr(Field* col, char* buf) {
16,651,760✔
239
    int pos = 0;
16,651,760✔
240
    if(strlen(col->encode) > 0) {
16,651,760✔
241
        pos +=sprintf(buf + pos, "encode \'%s\' ", col->encode);
31,674✔
242
    }
243
    if(strlen(col->compress) > 0) {
16,651,760✔
244
        pos +=sprintf(buf + pos, "compress \'%s\' ", col->compress);
31,782✔
245
    }
246
    if(strlen(col->level) > 0) {
16,651,760✔
247
        pos +=sprintf(buf + pos, "level \'%s\' ", col->level);
31,728✔
248
    }
249

250
    return pos;
16,651,760✔
251
}
252

253
static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
36,361✔
254
    if (g_arguments->supplementInsert) {
36,361✔
255
        return 0;
39✔
256
    }
257

258
    uint32_t col_buffer_len = (TSDB_COL_NAME_LEN + 15 + COMP_NAME_LEN*3) * stbInfo->cols->size;
36,322✔
259
    char         *colsBuf = benchCalloc(1, col_buffer_len, false);
36,322✔
260
    char*         command = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
36,322✔
261
    int          len = 0;
36,322✔
262

263
    for (int colIndex = 0; colIndex < stbInfo->cols->size; colIndex++) {
16,688,082✔
264
        Field * col = benchArrayGet(stbInfo->cols, colIndex);
16,651,760✔
265
        int n;
266
        if (col->type == TSDB_DATA_TYPE_BINARY ||
16,651,760✔
267
            col->type == TSDB_DATA_TYPE_NCHAR ||
16,185,789✔
268
            col->type == TSDB_DATA_TYPE_VARBINARY ||
15,791,763✔
269
            col->type == TSDB_DATA_TYPE_GEOMETRY) {
15,791,439✔
270
            n = snprintf(colsBuf + len, col_buffer_len - len,
1,721,296✔
271
                    ",%s %s(%d)", col->name,
860,648✔
272
                    convertDatatypeToString(col->type), col->length);
860,648✔
273
            if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) {
860,648✔
274
                errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
×
275
                           colIndex);
276
                return -1;
×
277
            }
278
        } else if (col->type == TSDB_DATA_TYPE_DECIMAL
15,791,112✔
279
                || col->type == TSDB_DATA_TYPE_DECIMAL64) {
15,790,707✔
280
            n = snprintf(colsBuf + len, col_buffer_len - len,
864✔
281
                    ",%s %s(%d,%d)", col->name,
810✔
282
                    convertDatatypeToString(col->type), col->precision, col->scale);
810✔
283
        } else {
284
            n = snprintf(colsBuf + len, col_buffer_len - len,
15,794,547✔
285
                    ",%s %s", col->name,
15,790,302✔
286
                    convertDatatypeToString(col->type));
15,790,302✔
287
        }
288

289
        // primary key
290
        if(stbInfo->primary_key && colIndex == 0) {
16,651,760✔
291
            len += n;
42✔
292
            n = snprintf(colsBuf + len, col_buffer_len - len, " %s", PRIMARY_KEY);
42✔
293
        }
294

295
        // compress key
296
        char keys[COMP_NAME_LEN*3] = "";
16,651,760✔
297
        if (getCompressStr(col, keys) > 0) {
16,651,760✔
298
            len += n;
31,890✔
299
            n = snprintf(colsBuf + len, col_buffer_len - len, " %s", keys);
31,890✔
300
        }
301

302
        if (n < 0 || n >= ((int)col_buffer_len - len)) {
16,651,760✔
303
            errorPrint("%s() LN%d, snprintf overflow on %d\n",
×
304
                       __func__, __LINE__, colIndex);
305
            break;
×
306
        } else {
307
            len += n;
16,651,760✔
308
        }
309
    }
310

311
    // save for creating child table
312
    stbInfo->colsOfCreateChildTable =
36,322✔
313
        (char *)benchCalloc(len + TIMESTAMP_BUFF_LEN, 1, true);
36,322✔
314

315
    (void)snprintf(stbInfo->colsOfCreateChildTable, len + TIMESTAMP_BUFF_LEN,
36,322✔
316
             "(%s timestamp%s)", stbInfo->primaryKeyName, colsBuf);
36,322✔
317

318
    if (stbInfo->tags->size == 0) {
36,322✔
319
        free(colsBuf);
1,549✔
320
        free(command);
1,549✔
321
        return 0;
1,549✔
322
    }
323

324
    uint32_t tag_buffer_len = (TSDB_COL_NAME_LEN + 15) * stbInfo->tags->size;
34,773✔
325
    char *tagsBuf = benchCalloc(1, tag_buffer_len, false);
34,773✔
326
    int  tagIndex;
327
    len = 0;
34,773✔
328

329
    int n;
330
    n = snprintf(tagsBuf + len, tag_buffer_len - len, "(");
34,773✔
331
    if (n < 0 || n >= ((int)tag_buffer_len - len)) {
34,773✔
332
        errorPrint("%s() LN%d snprintf overflow\n",
×
333
                       __func__, __LINE__);
334
        free(colsBuf);
×
335
        free(command);
×
336
        tmfree(tagsBuf);
×
337
        return -1;
×
338
    } else {
339
        len += n;
34,773✔
340
    }
341
    for (tagIndex = 0; tagIndex < stbInfo->tags->size; tagIndex++) {
141,510✔
342
        Field *tag = benchArrayGet(stbInfo->tags, tagIndex);
106,792✔
343
        if (tag->type == TSDB_DATA_TYPE_BINARY ||
106,792✔
344
            tag->type == TSDB_DATA_TYPE_NCHAR ||
71,254✔
345
            tag->type == TSDB_DATA_TYPE_VARBINARY ||
65,490✔
346
            tag->type == TSDB_DATA_TYPE_GEOMETRY) {
64,712✔
347
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
84,740✔
348
                    "%s %s(%d),", tag->name,
42,370✔
349
                    convertDatatypeToString(tag->type), tag->length);
42,370✔
350
            if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) {
42,370✔
351
                errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
×
352
                           tagIndex);
353
                return -1;
×
354
            }
355
        } else if (tag->type == TSDB_DATA_TYPE_JSON) {
64,422✔
356
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
55✔
357
                    "%s json", tag->name);
55✔
358
            if (n < 0 || n >= ((int)tag_buffer_len - len)) {
55✔
359
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
360
                       __func__, __LINE__, tagIndex);
361
                break;
×
362
            } else {
363
                len += n;
55✔
364
            }
365
            goto skip;
55✔
366
        }
367
        // else if (tag->type == TSDB_DATA_TYPE_DECIMAL
368
        //         || tag->type == TSDB_DATA_TYPE_DECIMAL64) {
369
        //     n = snprintf(tagsBuf + len, tag_buffer_len - len,
370
        //             "%s %s(%d,%d),", tag->name,
371
        //             convertDatatypeToString(tag->type), tag->precision, tag->scale);
372
        // }
373
        else {
374
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
67,352✔
375
                    "%s %s,", tag->name,
64,367✔
376
                    convertDatatypeToString(tag->type));
64,367✔
377
        }
378

379
        if (n < 0 || n >= ((int)tag_buffer_len - len)) {
106,737✔
380
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
381
                       __func__, __LINE__, tagIndex);
382
            break;
×
383
        } else {
384
            len += n;
106,737✔
385
        }
386
    }
387
    len -= 1;
34,718✔
388
skip:
34,773✔
389
    (void)snprintf(tagsBuf + len, tag_buffer_len - len, ")");
34,773✔
390

391
    int length = snprintf(
68,735✔
392
        command, TSDB_MAX_ALLOWED_SQL_LEN,
393
        g_arguments->escape_character
34,773✔
394
            ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` (%s TIMESTAMP%s) TAGS %s"
395
            : "CREATE TABLE IF NOT EXISTS %s.%s (%s TIMESTAMP%s) TAGS %s",
396
        database->dbName, stbInfo->stbName, stbInfo->primaryKeyName, colsBuf, tagsBuf);
34,773✔
397
    tmfree(colsBuf);
34,773✔
398
    tmfree(tagsBuf);
34,773✔
399
    if (stbInfo->comment != NULL) {
34,773✔
400
        length += snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length,
×
401
                           " COMMENT '%s'", stbInfo->comment);
402
    }
403
    if (stbInfo->delay >= 0) {
34,773✔
404
        length += snprintf(command + length,
×
405
                           TSDB_MAX_ALLOWED_SQL_LEN - length, " DELAY %d",
×
406
                           stbInfo->delay);
407
    }
408
    if (stbInfo->file_factor >= 0) {
34,773✔
409
        length +=
×
410
            snprintf(command + length,
×
411
                     TSDB_MAX_ALLOWED_SQL_LEN - length, " FILE_FACTOR %f",
×
412
                     (float)stbInfo->file_factor / 100);
×
413
    }
414
    if (stbInfo->rollup != NULL) {
34,773✔
415
        length += snprintf(command + length,
×
416
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
417
                           " ROLLUP(%s)", stbInfo->rollup);
418
    }
419

420
    if (stbInfo->max_delay != NULL) {
34,773✔
421
        length += snprintf(command + length,
×
422
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
423
                " MAX_DELAY %s", stbInfo->max_delay);
424
    }
425

426
    if (stbInfo->watermark != NULL) {
34,773✔
427
        length += snprintf(command + length,
×
428
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
429
                " WATERMARK %s", stbInfo->watermark);
430
    }
431

432
    // not support ttl in super table
433
    /*
434
    if (stbInfo->ttl != 0) {
435
        length += snprintf(command + length,
436
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
437
                " TTL %d", stbInfo->ttl);
438
    }
439
    */
440

441
    bool first_sma = true;
34,773✔
442
    for (int i = 0; i < stbInfo->cols->size; i++) {
13,185,290✔
443
        Field * col = benchArrayGet(stbInfo->cols, i);
13,150,517✔
444
        if (col->sma) {
13,150,517✔
445
            if (first_sma) {
×
446
                n = snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, " SMA(%s", col->name);
×
447
                first_sma = false;
×
448
            } else {
449
                n = snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, ",%s", col->name);
×
450
            }
451

452
            if (n < 0 || n > TSDB_MAX_ALLOWED_SQL_LEN - length) {
×
453
                errorPrint("%s() LN%d snprintf overflow on %d iteral\n", __func__, __LINE__, i);
×
454
                break;
×
455
            } else {
456
                length += n;
×
457
            }
458
        }
459
    }
460
    if (!first_sma) {
34,773✔
461
        (void)snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, ")");
×
462
    }
463
    debugPrint("create stable: <%s>\n", command);
34,773✔
464

465
    int ret = queryDbExec(database, stbInfo, command);
34,773✔
466
    free(command);
34,773✔
467
    return ret;
34,773✔
468
}
469

470

471
int32_t getVgroupsNative(SBenchConn *conn, SDataBase *database) {
347✔
472
    int     vgroups = 0;
347✔
473
    char    cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
347✔
474
    (void)snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
598✔
475
            g_arguments->escape_character
347✔
476
            ? "SHOW `%s`.VGROUPS"
477
            : "SHOW %s.VGROUPS",
478
            database->dbName);
479

480
    TAOS_RES* res = taos_query(conn->taos, cmd);
347✔
481
    int code = taos_errno(res);
347✔
482
    if (code) {
347✔
483
        printErrCmdCodeStr(cmd, code, res);
×
484
        return -1;
×
485
    }
486

487
    TAOS_ROW row = NULL;
347✔
488
    while ((row = taos_fetch_row(res)) != NULL) {
1,609✔
489
        vgroups++;
1,262✔
490
    }
491
    debugPrint("%s() LN%d, vgroups: %d\n", __func__, __LINE__, vgroups);
347✔
492
    taos_free_result(res);
347✔
493

494
    database->vgroups = vgroups;
347✔
495
    database->vgArray = benchArrayInit(vgroups, sizeof(SVGroup));
347✔
496
    for (int32_t v = 0; (v < vgroups
347✔
497
            && !g_arguments->terminate); v++) {
1,609✔
498
        SVGroup *vg = benchCalloc(1, sizeof(SVGroup), true);
1,262✔
499
        (void)benchArrayPush(database->vgArray, vg);
1,262✔
500
    }
501

502
    res = taos_query(conn->taos, cmd);
347✔
503
    code = taos_errno(res);
347✔
504
    if (code) {
347✔
505
        printErrCmdCodeStr(cmd, code, res);
×
506
        return -1;
×
507
    }
508

509
    int32_t vgItem = 0;
347✔
510
    while (((row = taos_fetch_row(res)) != NULL)
1,609✔
511
            && !g_arguments->terminate) {
1,609✔
512
        SVGroup *vg = benchArrayGet(database->vgArray, vgItem);
1,262✔
513
        vg->vgId = *(int32_t*)row[0];
1,262✔
514
        vgItem++;
1,262✔
515
    }
516
    taos_free_result(res);
347✔
517

518
    return vgroups;
347✔
519
}
520

521
int geneDbCreateCmd(SDataBase *database, char *command, int remainVnodes) {
28,179✔
522
    int dataLen = 0;
28,179✔
523
    int n;
524

525
    // create database
526
    n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen,
55,487✔
527
                g_arguments->escape_character
28,179✔
528
                    ? "CREATE DATABASE IF NOT EXISTS `%s`"
529
                    : "CREATE DATABASE IF NOT EXISTS %s",
530
                    database->dbName);
531

532
    if (n < 0 || n >= SHORT_1K_SQL_BUFF_LEN - dataLen) {
28,179✔
533
        errorPrint("%s() LN%d snprintf overflow\n",
×
534
                           __func__, __LINE__);
535
        return -1;
×
536
    } else {
537
        dataLen += n;
28,179✔
538
    }
539

540
    int vgroups = g_arguments->inputted_vgroups;
28,179✔
541

542
    // append config items
543
    if (database->cfgs) {
28,179✔
544
        for (int i = 0; i < database->cfgs->size; i++) {
69,953✔
545
            SDbCfg* cfg = benchArrayGet(database->cfgs, i);
41,774✔
546

547
            // check vgroups
548
            if (trimCaseCmp(cfg->name, "vgroups") == 0) {
41,774✔
549
                if (vgroups > 0) {
8,248✔
550
                    // inputted vgroups by commandline
551
                    infoPrint("ignore config set vgroups %d\n", cfg->valueint);
14✔
552
                } else {
553
                    vgroups = cfg->valueint;
8,234✔
554
                }
555
                continue;
8,248✔
556
            }
557

558
            if (cfg->valuestring) {
33,526✔
559
                n = snprintf(command + dataLen,
16,106✔
560
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
8,079✔
561
                            " %s %s", cfg->name, cfg->valuestring);
562
            } else {
563
                n = snprintf(command + dataLen,
50,733✔
564
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
25,447✔
565
                            " %s %d", cfg->name, cfg->valueint);
566
            }
567
            if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - dataLen) {
33,526✔
568
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
569
                           __func__, __LINE__, i);
570
                break;
×
571
            } else {
572
                dataLen += n;
33,526✔
573
            }
574
        }
575
    }
576

577
    // not found vgroups
578
    if (vgroups > 0) {
28,179✔
579
        dataLen += snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen, " VGROUPS %d", vgroups);
12,053✔
580
    }
581

582
    switch (database->precision) {
28,179✔
583
        case TSDB_TIME_PRECISION_MILLI:
27,857✔
584
            (void)snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
27,857✔
585
                                " PRECISION \'ms\';");
586
            break;
27,857✔
587
        case TSDB_TIME_PRECISION_MICRO:
202✔
588
            (void)snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
202✔
589
                                " PRECISION \'us\';");
590
            break;
202✔
591
        case TSDB_TIME_PRECISION_NANO:
120✔
592
            (void)snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
120✔
593
                                " PRECISION \'ns\';");
594
            break;
120✔
595
    }
596

597
    return dataLen;
28,179✔
598
}
599

600
int createDatabaseRest(SDataBase* database) {
188✔
601
    int32_t code = 0;
188✔
602
    char       command[SHORT_1K_SQL_BUFF_LEN] = "\0";
188✔
603

604
    int sockfd = createSockFd();
188✔
605
    if (sockfd < 0) {
188✔
606
        return -1;
90✔
607
    }
608

609
    // drop exist database
610
    (void)snprintf(command, SHORT_1K_SQL_BUFF_LEN,
98✔
611
            g_arguments->escape_character
98✔
612
                ? "DROP DATABASE IF EXISTS `%s`;"
613
                : "DROP DATABASE IF EXISTS %s;",
614
             database->dbName);
615
    code = postProcessSql(command,
98✔
616
                        database->dbName,
617
                        database->precision,
618
                        REST_IFACE,
619
                        0,
620
                        g_arguments->port,
98✔
621
                        false,
622
                        sockfd,
623
                        NULL);
624
    if (code != 0) {
98✔
625
        errorPrint("Failed to drop database %s\n", database->dbName);
×
626
    }
627

628
    // create database
629
    int remainVnodes = INT_MAX;
98✔
630
    geneDbCreateCmd(database, command, remainVnodes);
98✔
631
    code = postProcessSql(command,
98✔
632
                        database->dbName,
633
                        database->precision,
634
                        REST_IFACE,
635
                        0,
636
                        g_arguments->port,
98✔
637
                        false,
638
                        sockfd,
639
                        NULL);
640
    int32_t trying = g_arguments->keep_trying;
98✔
641
    while (code && trying) {
98✔
642
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
643
                "re-create database %s\n",
644
                g_arguments->trying_interval, database->dbName);
645
        toolsMsleep(g_arguments->trying_interval);
×
646
        code = postProcessSql(command,
×
647
                        database->dbName,
648
                        database->precision,
649
                        REST_IFACE,
650
                        0,
651
                        g_arguments->port,
×
652
                        false,
653
                        sockfd,
654
                        NULL);
655
        if (trying != -1) {
×
656
            trying--;
×
657
        }
658
    }
659

660
    destroySockFd(sockfd);
98✔
661
    return code;
98✔
662
}
663

664
int32_t getRemainVnodes(SBenchConn *conn) {
219✔
665
    int remainVnodes = 0;
219✔
666
    char command[SHORT_1K_SQL_BUFF_LEN] = "SHOW DNODES";
219✔
667

668
    TAOS_RES *res = taos_query(conn->taos, command);
219✔
669
    int32_t   code = taos_errno(res);
219✔
670
    if (code) {
219✔
671
        printErrCmdCodeStr(command, code, res);
×
672
        closeBenchConn(conn);
×
673
        return -1;
×
674
    }
675
    TAOS_ROW row = NULL;
219✔
676
    while ((row = taos_fetch_row(res)) != NULL) {
438✔
677
        remainVnodes += (*(int16_t*)(row[3]) - *(int16_t*)(row[2]));
219✔
678
    }
679
    debugPrint("%s() LN%d, remainVnodes: %d\n",
219✔
680
               __func__, __LINE__, remainVnodes);
681
    taos_free_result(res);
219✔
682
    return remainVnodes;
219✔
683
}
684

685
int createDatabaseTaosc(SDataBase* database) {
28,364✔
686
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
28,364✔
687
    // conn
688
    SBenchConn* conn = initBenchConn(NULL);
28,364✔
689
    if (NULL == conn) {
28,364✔
690
        return -1;
283✔
691
    }
692

693
    // drop stream in old database
694
    for (int i = 0; i < g_arguments->streams->size; i++) {
28,081✔
695
        SSTREAM* stream = benchArrayGet(g_arguments->streams, i);
×
696
        if (stream->drop) {
×
697
            (void)snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
698
                        "DROP STREAM IF EXISTS %s;",
699
                    stream->stream_name);
×
700
            if (queryDbExecCall(conn, command)) {
×
701
                closeBenchConn(conn);
×
702
                return -1;
×
703
            }
704
            infoPrint("%s\n", command);
×
705
            memset(command, 0, SHORT_1K_SQL_BUFF_LEN);
×
706
        }
707
    }
708

709
    // drop old database
710
    (void)snprintf(command, SHORT_1K_SQL_BUFF_LEN,
55,389✔
711
            g_arguments->escape_character
28,081✔
712
                ? "DROP DATABASE IF EXISTS `%s`;":
713
            "DROP DATABASE IF EXISTS %s;",
714
             database->dbName);
715
    if (0 != queryDbExecCall(conn, command)) {
28,081✔
716
        if (g_arguments->dsn) {
×
717
            // websocket
718
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
719
                      "to drop database! DROP DATABASE failure is ignored!\n");
720
        }
721
        closeBenchConn(conn);
×
722
        return -1;
×
723
    }
724

725
    // get remain vgroups
726
    int remainVnodes = INT_MAX;
28,081✔
727
    if (g_arguments->bind_vgroup) {
28,081✔
728
        remainVnodes = getRemainVnodes(conn);
219✔
729
        if (0 >= remainVnodes) {
219✔
730
            errorPrint("Remain vnodes %d, failed to create database\n",
×
731
                       remainVnodes);
732
            return -1;
×
733
        }
734
    }
735

736
    // generate and execute create database sql
737
    geneDbCreateCmd(database, command, remainVnodes);
28,081✔
738
    int32_t code = queryDbExecCall(conn, command);
28,081✔
739
    int32_t trying = g_arguments->keep_trying;
28,081✔
740
    while (code && trying) {
28,081✔
741
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
742
                  "re-create database %s\n",
743
                  g_arguments->trying_interval, database->dbName);
744
        toolsMsleep(g_arguments->trying_interval);
×
745
        code = queryDbExecCall(conn, command);
×
746
        if (trying != -1) {
×
747
            trying--;
×
748
        }
749
    }
750

751
    if (code) {
28,081✔
752
        if (g_arguments->dsn) {
×
753
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
754
                      "to create database! CREATE DATABASE "
755
                      "failure is ignored!\n");
756
        }
757

758
        closeBenchConn(conn);
×
759
        errorPrint("\ncreate database %s failed!\n\n", database->dbName);
×
760
        return -1;
×
761
    }
762
    infoPrint("command to create database: <%s>\n", command);
28,081✔
763

764

765
    // malloc and get vgroup
766
    if (g_arguments->bind_vgroup) {
28,081✔
767
        int32_t vgroups;
768
        vgroups = getVgroupsNative(conn, database);
219✔
769
        if (vgroups <= 0) {
219✔
770
            closeBenchConn(conn);
×
771
            errorPrint("Database %s's vgroups is %d\n",
×
772
                        database->dbName, vgroups);
773
            return -1;
×
774
        }
775
    }
776

777
    closeBenchConn(conn);
28,081✔
778
    return 0;
28,081✔
779
}
780

781
int createDatabase(SDataBase* database) {
28,552✔
782
    int ret = 0;
28,552✔
783
    if (isRest(g_arguments->iface)) {
28,552✔
784
        ret = createDatabaseRest(database);
188✔
785
    } else {
786
        ret = createDatabaseTaosc(database);
28,364✔
787
    }
788
    return ret;
28,552✔
789
}
790

791
static int generateChildTblName(int len, char *buffer, SDataBase *database,
13,813,901✔
792
                                SSuperTable *stbInfo, uint64_t tableSeq, char* tagData, int i,
793
                                char *ttl, char *tableName) {
794
    if (0 == len) {
13,813,901✔
795
        memset(buffer, 0, TSDB_MAX_ALLOWED_SQL_LEN);
12,757,013✔
796
        len += snprintf(buffer + len,
12,766,955✔
797
                        TSDB_MAX_ALLOWED_SQL_LEN - len, "CREATE TABLE");
12,757,013✔
798
    }
799

800
    const char *tagStart = tagData + i * stbInfo->lenOfTags;  // start position of current row's TAG
13,823,843✔
801
    const char *tagsForSQL = tagStart;  // actual TAG part for SQL
13,825,468✔
802
    const char *fmt = g_arguments->escape_character ?
13,861,606✔
803
        " IF NOT EXISTS `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s " :
13,825,025✔
804
        " IF NOT EXISTS %s.%s USING %s.%s TAGS (%s) %s ";
805

806
    if (stbInfo->useTagTableName) {
13,825,025✔
807
        char *firstComma = strchr(tagStart, ',');
312✔
808
        if (firstComma == NULL && tagStart >= firstComma) {
312✔
809
            errorPrint("Invalid tag data format: %s\n", tagStart);
×
810
            return -1;
×
811
        }
812
        size_t nameLen = firstComma - tagStart;
312✔
813
        strncpy(tableName, tagStart, nameLen);
312✔
814
        tableName[nameLen] = '\0';
312✔
815
        tagsForSQL = firstComma + 1;
312✔
816
    } else {
817
        // generate table name using prefix + sequence number
818
        (void)snprintf(tableName, TSDB_TABLE_NAME_LEN, "%s%" PRIu64,
13,825,156✔
819
                 stbInfo->childTblPrefix, tableSeq);
820
        tagsForSQL = tagStart;
13,825,156✔
821

822
    }
823

824
    len += snprintf(buffer + len, TSDB_MAX_ALLOWED_SQL_LEN - len, fmt,
13,825,468✔
825
                    database->dbName, tableName,
826
                    database->dbName, stbInfo->stbName,
827
                    tagsForSQL, ttl);
828
    debugPrint("create table: <%s> <%s>\n", buffer, tableName);
13,823,760✔
829
    return len;
13,823,711✔
830
}
831

832
static int getBatchOfTblCreating(threadInfo *pThreadInfo,
13,824,115✔
833
                                         SSuperTable *stbInfo) {
834
    BArray *batchArray = stbInfo->batchTblCreatingNumbersArray;
13,824,115✔
835
    if (batchArray) {
13,824,066✔
836
        int *batch = benchArrayGet(
840✔
837
                batchArray, pThreadInfo->posOfTblCreatingBatch);
840✔
838
        pThreadInfo->posOfTblCreatingBatch++;
840✔
839
        if (pThreadInfo->posOfTblCreatingBatch == batchArray->size) {
840✔
840
            pThreadInfo->posOfTblCreatingBatch = 0;
168✔
841
        }
842
        return *batch;
840✔
843
    }
844
    return 0;
13,823,226✔
845
}
846

847
static int getIntervalOfTblCreating(threadInfo *pThreadInfo,
12,680,720✔
848
                                         SSuperTable *stbInfo) {
849
    BArray *intervalArray = stbInfo->batchTblCreatingIntervalsArray;
12,680,720✔
850
    if (intervalArray) {
12,679,756✔
851
        int *interval = benchArrayGet(
336✔
852
                intervalArray, pThreadInfo->posOfTblCreatingInterval);
336✔
853
        pThreadInfo->posOfTblCreatingInterval++;
336✔
854
        if (pThreadInfo->posOfTblCreatingInterval == intervalArray->size) {
336✔
855
            pThreadInfo->posOfTblCreatingInterval = 0;
84✔
856
        }
857
        return *interval;
336✔
858
    }
859
    return 0;
12,679,420✔
860
}
861

862
// table create thread
863
static void *createTable(void *sarg) {
172,853✔
864
    if (g_arguments->supplementInsert) {
172,853✔
865
        return NULL;
39✔
866
    }
867

868
    threadInfo  *pThreadInfo    = (threadInfo *)sarg;
172,814✔
869
    SDataBase   *database       = pThreadInfo->dbInfo;
172,814✔
870
    SSuperTable *stbInfo        = pThreadInfo->stbInfo;
172,814✔
871
    uint64_t    lastTotalCreate = 0;
172,814✔
872
    uint64_t    lastPrintTime   = toolsGetTimestampMs();
172,814✔
873
    int32_t     len             = 0;
172,814✔
874
    int32_t     batchNum        = 0;
172,814✔
875
    char ttl[SMALL_BUFF_LEN]    = "";
172,814✔
876

877
#ifdef LINUX
878
    prctl(PR_SET_NAME, "createTable");
172,814✔
879
#endif
880
    pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
172,772✔
881
    infoPrint(
172,814✔
882
              "thread[%d] start creating table from %" PRIu64 " to %" PRIu64
883
              "\n",
884
              pThreadInfo->threadID, pThreadInfo->start_table_from,
885
              pThreadInfo->end_table_to);
886
    if (stbInfo->ttl != 0) {
172,814✔
887
        (void)snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
1,068✔
888
    }
889

890
    // tag read from csv
891
    FILE *csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
172,814✔
892
    // malloc
893
    char* tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
172,814✔
894
    int         w = 0; // record tagData
172,814✔
895

896
    int smallBatchCount = 0;
172,814✔
897
    int index=  pThreadInfo->start_table_from;
172,814✔
898
    int tableSum = pThreadInfo->end_table_to - pThreadInfo->start_table_from + 1;
172,814✔
899
    if (stbInfo->useTagTableName) {
172,814✔
900
        pThreadInfo->childNames = benchCalloc(tableSum, sizeof(char *), false);
312✔
901
        pThreadInfo->childTblCount = tableSum;
312✔
902
    }
903

904
    for (uint64_t i = pThreadInfo->start_table_from, j = 0;
172,814✔
905
                  i <= pThreadInfo->end_table_to && !g_arguments->terminate;
13,987,116✔
906
                  i++, j++) {
13,814,302✔
907
        if (g_arguments->terminate) {
13,812,936✔
908
            goto create_table_end;
×
909
        }
910
        if (!stbInfo->use_metric || stbInfo->tags->size == 0) {
13,820,232✔
911
            if (stbInfo->childTblCount == 1) {
1,140✔
912
                (void)snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
2,910✔
913
                         g_arguments->escape_character
1,455✔
914
                         ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` %s;"
915
                         : "CREATE TABLE IF NOT EXISTS %s.%s %s;",
916
                         database->dbName, stbInfo->stbName,
917
                         stbInfo->colsOfCreateChildTable);
918
            } else {
919
                (void)snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
3,102✔
920
                         g_arguments->escape_character
1,034✔
921
                         ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` %s;"
922
                         : "CREATE TABLE IF NOT EXISTS %s.%s %s;",
923
                         database->dbName,
924
                         stbInfo->childTblArray[i]->name,
1,034✔
925
                         stbInfo->colsOfCreateChildTable);
926
            }
927
            batchNum++;
2,489✔
928
        } else {
929
            if (0 == len) {
13,812,707✔
930
                batchNum = 0;
12,759,275✔
931
            }
932
            // generator
933
            if (w == 0) {
13,812,707✔
934
                if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
280,677✔
935
                    goto create_table_end;
×
936
                }
937
            }
938
            char tbName[TSDB_TABLE_NAME_LEN] = {0};
13,812,707✔
939
            len = generateChildTblName(len, pThreadInfo->buffer,
13,801,296✔
940
                                       database, stbInfo, i, tagData, w, ttl, tbName);
941
            if (stbInfo->useTagTableName) {
13,822,850✔
942
                pThreadInfo->childNames[j] = strdup(tbName);
312✔
943
            }
944
            // move next
945
            if (++w >= TAG_BATCH_COUNT) {
13,823,762✔
946
                // reset for gen again
947
                w = 0;
119,738✔
948
                index += TAG_BATCH_COUNT;
119,738✔
949
            }
950

951
            batchNum++;
13,823,762✔
952
            smallBatchCount++;
13,823,762✔
953

954
            int smallBatch = getBatchOfTblCreating(pThreadInfo, stbInfo);
13,823,762✔
955
            if ((!smallBatch || (smallBatchCount == smallBatch))
13,823,270✔
956
                    && (batchNum < stbInfo->batchTblCreatingNum)
13,823,287✔
957
                    && ((TSDB_MAX_ALLOWED_SQL_LEN - len) >=
2,255,374✔
958
                        (stbInfo->lenOfTags + EXTRA_SQL_LEN))) {
1,137,101✔
959
                continue;
1,137,150✔
960
            } else {
961
                smallBatchCount = 0;
12,686,285✔
962
            }
963
        }
964

965
        len = 0;
12,689,087✔
966

967
        int ret = 0;
12,689,087✔
968
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
12,689,087✔
969
                   pThreadInfo->buffer);
970
        // REST
971
        if (REST_IFACE == stbInfo->iface) {
12,688,254✔
972
            ret = queryDbExecRest(pThreadInfo->buffer,
×
973
                                  database->dbName,
974
                                  database->precision,
975
                                  stbInfo->iface,
×
976
                                  stbInfo->lineProtocol,
×
977
                                  stbInfo->tcpTransfer,
×
978
                                  pThreadInfo->sockfd);
979
        } else {
980
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
12,686,873✔
981
            int32_t trying = g_arguments->keep_trying;
12,687,204✔
982
            while (ret && trying) {
12,683,529✔
983
                infoPrint("will sleep %"PRIu32" milliseconds then re-create "
×
984
                          "table %s\n",
985
                          g_arguments->trying_interval, pThreadInfo->buffer);
986
                toolsMsleep(g_arguments->trying_interval);
×
987
                ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
988
                if (trying != -1) {
×
989
                    trying--;
×
990
                }
991
            }
992
        }
993

994
        if (0 != ret) {
12,686,201✔
995
            g_fail = true;
6,368✔
996
            goto create_table_end;
6,368✔
997
        }
998
        uint64_t intervalOfTblCreating = getIntervalOfTblCreating(pThreadInfo,
12,679,833✔
999
                                                                  stbInfo);
1000
        if (intervalOfTblCreating) {
12,678,058✔
1001
            debugPrint("will sleep %"PRIu64" milliseconds "
336✔
1002
                       "for table creating interval\n", intervalOfTblCreating);
1003
            toolsMsleep(intervalOfTblCreating);
336✔
1004
        }
1005

1006
        pThreadInfo->tables_created += batchNum;
12,678,058✔
1007
        batchNum = 0;
12,681,351✔
1008
        uint64_t currentPrintTime = toolsGetTimestampMs();
12,681,351✔
1009
        if (currentPrintTime - lastPrintTime > PRINT_STAT_INTERVAL) {
12,668,610✔
1010
            float speed = (pThreadInfo->tables_created - lastTotalCreate) * 1000 / (currentPrintTime - lastPrintTime);
7,296✔
1011
            infoPrint("thread[%d] already created %" PRId64 " tables, peroid speed: %.0f tables/s\n",
7,296✔
1012
                       pThreadInfo->threadID, pThreadInfo->tables_created, speed);
1013
            lastPrintTime   = currentPrintTime;
7,310✔
1014
            lastTotalCreate = pThreadInfo->tables_created;
7,310✔
1015
        }
1016
    }
1017

1018
    if (0 != len) {
166,393✔
1019
        int ret = 0;
78,637✔
1020
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
78,637✔
1021
                   pThreadInfo->buffer);
1022
        // REST
1023
        if (REST_IFACE == stbInfo->iface) {
78,637✔
1024
            ret = queryDbExecRest(pThreadInfo->buffer,
112✔
1025
                                  database->dbName,
1026
                                  database->precision,
1027
                                  stbInfo->iface,
112✔
1028
                                  stbInfo->lineProtocol,
112✔
1029
                                  stbInfo->tcpTransfer,
112✔
1030
                                  pThreadInfo->sockfd);
1031
        } else {
1032
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
78,525✔
1033
        }
1034
        if (0 != ret) {
78,637✔
1035
            g_fail = true;
×
1036
            goto create_table_end;
×
1037
        }
1038
        pThreadInfo->tables_created += batchNum;
78,637✔
1039
        debugPrint("thread[%d] already created %" PRId64 " tables\n",
78,637✔
1040
                   pThreadInfo->threadID, pThreadInfo->tables_created);
1041
    }
1042
create_table_end:
162,564✔
1043
    // free
1044
    tmfree(tagData);
172,761✔
1045
    tmfree(pThreadInfo->buffer);
172,761✔
1046
    pThreadInfo->buffer = NULL;
172,814✔
1047
    if(csvFile) {
172,814✔
1048
        fclose(csvFile);
816✔
1049
    }
1050
    return NULL;
172,814✔
1051
}
1052

1053
static int startMultiThreadCreateChildTable(SDataBase* database, SSuperTable* stbInfo) {
34,358✔
1054
    int32_t code    = -1;
34,358✔
1055
    int32_t threads = g_arguments->table_threads;
34,358✔
1056
    int64_t ntables;
1057

1058
    // malloc stmtData for tags
1059
    prepareTagsStmt(stbInfo);
34,358✔
1060

1061
    if (stbInfo->childTblTo > 0) {
34,358✔
1062
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
13✔
1063
    } else if(stbInfo->childTblFrom > 0) {
34,345✔
1064
        ntables = stbInfo->childTblCount - stbInfo->childTblFrom;
×
1065
    } else {
1066
        ntables = stbInfo->childTblCount;
34,345✔
1067
    }
1068
    pthread_t   *pids = benchCalloc(1, threads * sizeof(pthread_t), false);
34,358✔
1069
    threadInfo  *infos = benchCalloc(1, threads * sizeof(threadInfo), false);
34,358✔
1070
    uint64_t     tableFrom = stbInfo->childTblFrom;
34,358✔
1071
    if (threads < 1) {
34,358✔
1072
        threads = 1;
×
1073
    }
1074
    if (ntables == 0) {
34,358✔
1075
        code = 0;
369✔
1076
        infoPrint("child table is zero, no need create. childTblCount: %"PRId64"\n", ntables);
369✔
1077
        goto over;
369✔
1078
    }
1079

1080
    int64_t div = ntables / threads;
33,989✔
1081
    if (div < 1) {
33,989✔
1082
        threads = (int)ntables;
11,207✔
1083
        div = 1;
11,207✔
1084
    }
1085
    int64_t mod = ntables % threads;
33,989✔
1086

1087
    int threadCnt = 0;
33,989✔
1088
    for (uint32_t i = 0; ((int32_t)i < threads && !g_arguments->terminate); i++) {
206,842✔
1089
        threadInfo *pThreadInfo = infos + i;
172,853✔
1090
        pThreadInfo->threadID = i;
172,853✔
1091
        pThreadInfo->stbInfo = stbInfo;
172,853✔
1092
        pThreadInfo->dbInfo = database;
172,853✔
1093
        if (REST_IFACE == stbInfo->iface) {
172,853✔
1094
            int sockfd = createSockFd();
112✔
1095
            if (sockfd < 0) {
112✔
1096
                FREE_PIDS_INFOS_RETURN_MINUS_1();
×
1097
            }
1098
            pThreadInfo->sockfd = sockfd;
112✔
1099
        } else {
1100
            pThreadInfo->conn = initBenchConn(pThreadInfo->dbInfo->dbName);
172,741✔
1101
            if (NULL == pThreadInfo->conn) {
172,741✔
1102
                goto over;
×
1103
            }
1104
        }
1105
        pThreadInfo->start_table_from = tableFrom;
172,853✔
1106
        pThreadInfo->ntables          = i < mod ? div + 1 : div;
172,853✔
1107
        pThreadInfo->end_table_to     = i < mod ? tableFrom + div : tableFrom + div - 1;
172,853✔
1108
        tableFrom = pThreadInfo->end_table_to + 1;
172,853✔
1109
        pThreadInfo->tables_created = 0;
172,853✔
1110
        debugPrint("div table by thread. i=%d from=%"PRId64" to=%"PRId64" ntable=%"PRId64"\n", i, pThreadInfo->start_table_from,
172,853✔
1111
                                        pThreadInfo->end_table_to, pThreadInfo->ntables);
1112
        pthread_create(pids + i, NULL, createTable, pThreadInfo);
172,853✔
1113
        threadCnt ++;
172,853✔
1114
    }
1115

1116
    for (int i = 0; i < threadCnt; i++) {
200,474✔
1117
        pthread_join(pids[i], NULL);
167,281✔
1118
    }
1119

1120
    if (g_arguments->terminate)  toolsMsleep(100);
33,193✔
1121

1122
    int nCount = 0;
33,193✔
1123
    for (int i = 0; i < threadCnt; i++) {
199,678✔
1124
        threadInfo *pThreadInfo = infos + i;
166,485✔
1125
        g_arguments->actualChildTables += pThreadInfo->tables_created;
166,485✔
1126

1127
        if ((REST_IFACE != stbInfo->iface) && pThreadInfo->conn) {
166,485✔
1128
            closeBenchConn(pThreadInfo->conn);
166,373✔
1129
        }
1130

1131
        if (stbInfo->useTagTableName) {
166,485✔
1132
            for (int j = 0; j < pThreadInfo->childTblCount; j++) {
624✔
1133
                stbInfo->childTblArray[nCount++]->name = pThreadInfo->childNames[j];
312✔
1134
            }
1135
            tmfree(pThreadInfo->childNames);
312✔
1136
        }
1137
    }
1138

1139
    if (g_fail) {
33,193✔
1140
        goto over;
796✔
1141
    }
1142
    code = 0;
32,397✔
1143
over:
33,562✔
1144
    free(pids);
33,562✔
1145
    free(infos);
33,562✔
1146
    return code;
33,562✔
1147
}
1148

1149
static int createChildTables() {
34,416✔
1150
    int32_t    code;
1151
    infoPrint("start creating %" PRId64 " table(s) with %d thread(s)\n",
34,416✔
1152
              g_arguments->totalChildTables, g_arguments->table_threads);
1153
    if (g_arguments->fpOfInsertResult) {
34,416✔
1154
        infoPrintToFile(
34,416✔
1155
                  "start creating %" PRId64 " table(s) with %d thread(s)\n",
1156
                  g_arguments->totalChildTables, g_arguments->table_threads);
1157
    }
1158
    int64_t start = (int64_t)toolsGetTimestampMs();
34,416✔
1159

1160
    for (int i = 0; (i < g_arguments->databases->size
66,302✔
1161
            && !g_arguments->terminate); i++) {
99,139✔
1162
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
34,429✔
1163
        if (database->superTbls) {
34,429✔
1164
            for (int j = 0; (j < database->superTbls->size
71,869✔
1165
                    && !g_arguments->terminate); j++) {
111,096✔
1166
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
40,819✔
1167
                if (stbInfo->autoTblCreating || stbInfo->iface == SML_IFACE
40,819✔
1168
                    || stbInfo->iface == SML_REST_IFACE) {
35,589✔
1169
                    g_arguments->autoCreatedChildTables +=
9,859✔
1170
                            stbInfo->childTblCount;
5,426✔
1171
                    continue;
5,426✔
1172
                }
1173
                if (stbInfo->childTblExists) {
35,393✔
1174
                    g_arguments->existedChildTables +=
1,962✔
1175
                            stbInfo->childTblCount;
1,035✔
1176
                    continue;
1,035✔
1177
                }
1178
                debugPrint("colsOfCreateChildTable: %s\n",
34,358✔
1179
                        stbInfo->colsOfCreateChildTable);
1180

1181
                code = startMultiThreadCreateChildTable(database, stbInfo);
34,358✔
1182
                if (code && !g_arguments->terminate) {
33,562✔
1183
                    return code;
796✔
1184
                }
1185
            }
1186
        }
1187
    }
1188

1189
    int64_t end = toolsGetTimestampMs();
32,824✔
1190
    if(end == start) {
32,824✔
1191
        end += 1;
3,735✔
1192
    }
1193
    succPrint(
32,824✔
1194
            "Spent %.4f seconds to create %" PRId64
1195
            " table(s) with %d thread(s) speed: %.0f tables/s, already exist %" PRId64
1196
            " table(s), actual %" PRId64 " table(s) pre created, %" PRId64
1197
            " table(s) will be auto created\n",
1198
            (float)(end - start) / 1000.0,
1199
            g_arguments->totalChildTables,
1200
            g_arguments->table_threads,
1201
            g_arguments->actualChildTables * 1000 / (float)(end - start),
1202
            g_arguments->existedChildTables,
1203
            g_arguments->actualChildTables,
1204
            g_arguments->autoCreatedChildTables);
1205
    return 0;
32,824✔
1206
}
1207

1208
static void freeChildTable(SChildTable *childTbl, int colsSize) {
34,236,301✔
1209
    if (childTbl->useOwnSample) {
34,236,301✔
1210
        if (childTbl->childCols) {
698✔
1211
            for (int col = 0; col < colsSize; col++) {
1,752✔
1212
                ChildField *childCol =
1213
                    benchArrayGet(childTbl->childCols, col);
1,272✔
1214
                if (childCol) {
1,272✔
1215
                    tmfree(childCol->stmtData.data);
1,272✔
1216
                    childCol->stmtData.data = NULL;
1,272✔
1217
                    tmfree(childCol->stmtData.is_null);
1,272✔
1218
                    childCol->stmtData.is_null = NULL;
1,272✔
1219
                    tmfree(childCol->stmtData.lengths);
1,272✔
1220
                    childCol->stmtData.lengths = NULL;
1,272✔
1221
                }
1222
            }
1223
            benchArrayDestroy(childTbl->childCols);
480✔
1224
        }
1225
        tmfree(childTbl->sampleDataBuf);
698✔
1226
    }
1227
    tmfree(childTbl);
34,236,301✔
1228
}
34,236,301✔
1229

1230
void postFreeResource() {
35,644✔
1231
    infoPrint("%s\n", "free resource and exit ...");
35,644✔
1232
    if (!g_arguments->terminate) {
35,644✔
1233
        tmfclose(g_arguments->fpOfInsertResult);
34,834✔
1234
    }
1235

1236
    for (int i = 0; i < g_arguments->databases->size; i++) {
71,301✔
1237
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
35,657✔
1238
        if (database->cfgs) {
35,657✔
1239
            for (int c = 0; c < database->cfgs->size; c++) {
88,107✔
1240
                SDbCfg *cfg = benchArrayGet(database->cfgs, c);
52,450✔
1241
                if (cfg->valuestring && cfg->free) {
52,450✔
1242
                    tmfree(cfg->valuestring);
67✔
1243
                    cfg->valuestring = NULL;
67✔
1244
                }
1245
            }
1246
            benchArrayDestroy(database->cfgs);
35,657✔
1247
        }
1248
        if (database->superTbls) {
35,657✔
1249
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
77,833✔
1250
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
42,176✔
1251
                tmfree(stbInfo->colsOfCreateChildTable);
42,176✔
1252
                stbInfo->colsOfCreateChildTable = NULL;
42,176✔
1253
                tmfree(stbInfo->sampleDataBuf);
42,176✔
1254
                stbInfo->sampleDataBuf = NULL;
42,176✔
1255
                tmfree(stbInfo->partialColNameBuf);
42,176✔
1256
                stbInfo->partialColNameBuf = NULL;
42,176✔
1257
                benchArrayDestroy(stbInfo->batchTblCreatingNumbersArray);
42,176✔
1258
                benchArrayDestroy(stbInfo->batchTblCreatingIntervalsArray);
42,176✔
1259
                for (int k = 0; k < stbInfo->tags->size; k++) {
184,428✔
1260
                    Field * tag = benchArrayGet(stbInfo->tags, k);
142,252✔
1261
                    tmfree(tag->stmtData.data);
142,252✔
1262
                    tag->stmtData.data = NULL;
142,252✔
1263
                    tmfree(tag->stmtData.is_null);
142,252✔
1264
                    tag->stmtData.is_null = NULL;
142,252✔
1265
                    tmfree(tag->stmtData.lengths);
142,252✔
1266
                    tag->stmtData.lengths = NULL;
142,252✔
1267
                }
1268
                benchArrayDestroy(stbInfo->tags);
42,176✔
1269

1270
                for (int k = 0; k < stbInfo->cols->size; k++) {
16,726,088✔
1271
                    Field * col = benchArrayGet(stbInfo->cols, k);
16,683,912✔
1272
                    tmfree(col->stmtData.data);
16,683,912✔
1273
                    col->stmtData.data = NULL;
16,683,912✔
1274
                    tmfree(col->stmtData.is_null);
16,683,912✔
1275
                    col->stmtData.is_null = NULL;
16,683,912✔
1276
                    tmfree(col->stmtData.lengths);
16,683,912✔
1277
                    col->stmtData.lengths = NULL;
16,683,912✔
1278
                }
1279
                if (g_arguments->test_mode == INSERT_TEST) {
42,176✔
1280
                    if (stbInfo->childTblArray) {
40,396✔
1281
                        for (int64_t child = 0; child < stbInfo->childTblCount;
34,276,324✔
1282
                                child++) {
34,236,301✔
1283
                            SChildTable *childTbl = stbInfo->childTblArray[child];
34,236,301✔
1284
                            if (childTbl) {
34,236,301✔
1285
                                tmfree(childTbl->name);
34,236,301✔
1286
                                freeChildTable(childTbl, stbInfo->cols->size);
34,236,301✔
1287
                            }
1288
                        }
1289
                    }
1290
                }
1291
                benchArrayDestroy(stbInfo->cols);
42,176✔
1292
                tmfree(stbInfo->childTblArray);
42,176✔
1293
                stbInfo->childTblArray = NULL;
42,176✔
1294
                benchArrayDestroy(stbInfo->tsmas);
42,176✔
1295

1296
                // free sqls
1297
                if(stbInfo->sqls) {
42,176✔
1298
                    char **sqls = stbInfo->sqls;
660✔
1299
                    while (*sqls) {
3,324✔
1300
                        free(*sqls);
2,664✔
1301
                        sqls++;
2,664✔
1302
                    }
1303
                    tmfree(stbInfo->sqls);
660✔
1304
                }
1305

1306
                // thread_bind
1307
                if (database->vgArray) {
42,176✔
1308
                    for (int32_t v = 0; v < database->vgroups; v++) {
1,609✔
1309
                        SVGroup *vg = benchArrayGet(database->vgArray, v);
1,262✔
1310
                        tmfree(vg->childTblArray);
1,262✔
1311
                        vg->childTblArray = NULL;
1,262✔
1312
                    }
1313
                    benchArrayDestroy(database->vgArray);
347✔
1314
                    database->vgArray = NULL;
347✔
1315
                }
1316
            }
1317
            benchArrayDestroy(database->superTbls);
35,657✔
1318
        }
1319
    }
1320
    benchArrayDestroy(g_arguments->databases);
35,644✔
1321
    benchArrayDestroy(g_arguments->streams);
35,644✔
1322
    tools_cJSON_Delete(root);
35,644✔
1323
}
35,644✔
1324

1325
int32_t execInsert(threadInfo *pThreadInfo, uint32_t k, int64_t *delay3) {
30,234,042✔
1326
    SDataBase *  database = pThreadInfo->dbInfo;
30,234,042✔
1327
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
30,240,177✔
1328
    TAOS_RES *   res = NULL;
30,233,971✔
1329
    int32_t      code = 0;
30,233,971✔
1330
    uint16_t     iface = stbInfo->iface;
30,233,971✔
1331
    int64_t      start = 0;
30,241,383✔
1332
    int32_t      affectRows = 0;
30,241,383✔
1333

1334
    int32_t trying = (stbInfo->keep_trying)?
30,659,306✔
1335
        stbInfo->keep_trying:g_arguments->keep_trying;
30,227,684✔
1336
    int32_t trying_interval = stbInfo->trying_interval?
30,659,077✔
1337
        stbInfo->trying_interval:g_arguments->trying_interval;
30,228,186✔
1338
    int protocol = stbInfo->lineProtocol;
30,240,708✔
1339

1340
    switch (iface) {
30,220,618✔
1341
        case TAOSC_IFACE:
21,596,104✔
1342
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
21,596,104✔
1343
            code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
21,595,962✔
1344
            while (code && trying && !g_arguments->terminate) {
21,599,503✔
1345
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1346
                          trying_interval);
1347
                toolsMsleep(trying_interval);
×
1348
                code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
UNCOV
1349
                if (trying != -1) {
×
1350
                    trying--;
×
1351
                }
1352
            }
1353
            break;
21,599,503✔
1354
        // REST
1355
        case REST_IFACE:
224✔
1356
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
224✔
1357
            code = postProcessSql(pThreadInfo->buffer,
224✔
1358
                                database->dbName,
1359
                                database->precision,
1360
                                stbInfo->iface,
224✔
1361
                                stbInfo->lineProtocol,
224✔
1362
                                g_arguments->port,
224✔
1363
                                stbInfo->tcpTransfer,
224✔
1364
                                pThreadInfo->sockfd,
1365
                                pThreadInfo->filePath);
224✔
1366
            while (code && trying && !g_arguments->terminate) {
224✔
1367
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1368
                          trying_interval);
1369
                toolsMsleep(trying_interval);
×
1370
                code = postProcessSql(pThreadInfo->buffer,
×
1371
                                    database->dbName,
1372
                                    database->precision,
1373
                                    stbInfo->iface,
×
1374
                                    stbInfo->lineProtocol,
×
1375
                                    g_arguments->port,
×
1376
                                    stbInfo->tcpTransfer,
×
1377
                                    pThreadInfo->sockfd,
1378
                                    pThreadInfo->filePath);
×
1379
                if (trying != -1) {
×
1380
                    trying--;
×
1381
                }
1382
            }
1383
            break;
224✔
1384

1385
        case STMT_IFACE:
7,658,732✔
1386
            // add batch
1387
            if(!stbInfo->autoTblCreating) {
7,658,732✔
1388
                start = toolsGetTimestampUs();
736,239✔
1389
                if (taos_stmt_add_batch(pThreadInfo->conn->stmt) != 0) {
736,237✔
1390
                    errorPrint("taos_stmt_add_batch() failed! reason: %s\n",
×
1391
                            taos_stmt_errstr(pThreadInfo->conn->stmt));
1392
                    return -1;
×
1393
                }
1394
                if(delay3) {
736,140✔
1395
                    *delay3 += toolsGetTimestampUs() - start;
736,336✔
1396
                }
1397
            }
1398

1399
            // execute
1400
            code = taos_stmt_execute(pThreadInfo->conn->stmt);
7,667,609✔
1401
            if (code) {
7,675,081✔
1402
                errorPrint(
×
1403
                           "failed to execute insert statement. reason: %s\n",
1404
                           taos_stmt_errstr(pThreadInfo->conn->stmt));
1405
                code = -1;
10✔
1406
            }
1407
            break;
7,675,075✔
1408

1409
        case STMT2_IFACE:
924,726✔
1410
            // execute
1411
            code = taos_stmt2_exec(pThreadInfo->conn->stmt2, &affectRows);
924,726✔
1412
            if (code) {
924,638✔
1413
                errorPrint( "failed to call taos_stmt2_exec(). reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
×
1414
                code = -1;
×
1415
            }
1416
            debugPrint( "succ call taos_stmt2_exec() affectRows:%d\n", affectRows);
924,638✔
1417
            break;
924,695✔
1418

1419
        case SML_IFACE:
40,746✔
1420
            res = taos_schemaless_insert(
102,196✔
1421
                pThreadInfo->conn->taos, pThreadInfo->lines,
40,746✔
1422
                (TSDB_SML_JSON_PROTOCOL == protocol
1423
                    || SML_JSON_TAOS_FORMAT == protocol)
31,134✔
1424
                    ? 0 : k,
1425
                (SML_JSON_TAOS_FORMAT == protocol)
1426
                    ? TSDB_SML_JSON_PROTOCOL : protocol,
1427
                (TSDB_SML_LINE_PROTOCOL == protocol)
1428
                    ? database->sml_precision
1429
                    : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1430
            code = taos_errno(res);
40,774✔
1431
            trying = stbInfo->keep_trying;
40,774✔
1432
            while (code && trying && !g_arguments->terminate) {
40,774✔
1433
                taos_free_result(res);
×
1434
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1435
                          trying_interval);
1436
                toolsMsleep(trying_interval);
×
1437
                res = taos_schemaless_insert(
×
1438
                        pThreadInfo->conn->taos, pThreadInfo->lines,
×
1439
                        (TSDB_SML_JSON_PROTOCOL == protocol
1440
                            || SML_JSON_TAOS_FORMAT == protocol)
×
1441
                            ? 0 : k,
1442
                        (SML_JSON_TAOS_FORMAT == protocol)
1443
                            ? TSDB_SML_JSON_PROTOCOL : protocol,
1444
                        (TSDB_SML_LINE_PROTOCOL == protocol)
1445
                            ? database->sml_precision
1446
                            : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1447
                code = taos_errno(res);
×
1448
                if (trying != -1) {
×
1449
                    trying--;
×
1450
                }
1451
            }
1452

1453
            if (code != TSDB_CODE_SUCCESS && !g_arguments->terminate) {
40,774✔
1454
                debugPrint("Failed to execute "
×
1455
                           "schemaless insert content: %s\n\n",
1456
                        pThreadInfo->lines?(pThreadInfo->lines[0]?
1457
                            pThreadInfo->lines[0]:""):"");
1458
                errorPrint(
×
1459
                    "failed to execute schemaless insert. "
1460
                        "code: 0x%08x reason: %s\n\n",
1461
                        code, taos_errstr(res));
1462
            }
1463
            taos_free_result(res);
40,774✔
1464
            break;
40,774✔
1465
        case SML_REST_IFACE: {
2,240✔
1466
            if (TSDB_SML_JSON_PROTOCOL == protocol
2,240✔
1467
                    || SML_JSON_TAOS_FORMAT == protocol) {
1,610✔
1468
                code = postProcessSql(pThreadInfo->lines[0], database->dbName,
630✔
1469
                                    database->precision, stbInfo->iface,
630✔
1470
                                    protocol, g_arguments->port,
630✔
1471
                                    stbInfo->tcpTransfer,
630✔
1472
                                    pThreadInfo->sockfd, pThreadInfo->filePath);
630✔
1473
            } else {
1474
                int len = 0;
1,610✔
1475
                for (int i = 0; i < k; i++) {
15,022✔
1476
                    if (strlen(pThreadInfo->lines[i]) != 0) {
13,454✔
1477
                        int n;
1478
                        if (TSDB_SML_TELNET_PROTOCOL == protocol
13,454✔
1479
                                && stbInfo->tcpTransfer) {
8,974✔
1480
                            n = snprintf(pThreadInfo->buffer + len,
4,480✔
1481
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
4,480✔
1482
                                           "put %s\n", pThreadInfo->lines[i]);
4,480✔
1483
                        } else {
1484
                            n = snprintf(pThreadInfo->buffer + len,
8,974✔
1485
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
8,974✔
1486
                                            "%s\n",
1487
                                           pThreadInfo->lines[i]);
8,974✔
1488
                        }
1489
                        if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - len) {
13,454✔
1490
                            errorPrint("%s() LN%d snprintf overflow on %d\n",
42✔
1491
                                __func__, __LINE__, i);
1492
                            break;
×
1493
                        } else {
1494
                            len += n;
13,412✔
1495
                        }
1496
                    } else {
1497
                        break;
×
1498
                    }
1499
                }
1500
                if (g_arguments->terminate) {
1,568✔
1501
                    break;
×
1502
                }
1503
                code = postProcessSql(pThreadInfo->buffer, database->dbName,
1,568✔
1504
                        database->precision,
1505
                        stbInfo->iface, protocol,
1,568✔
1506
                        g_arguments->port,
1,568✔
1507
                        stbInfo->tcpTransfer,
1,568✔
1508
                        pThreadInfo->sockfd, pThreadInfo->filePath);
1,568✔
1509
            }
1510
            break;
2,254✔
1511
        }
1512
    }
1513
    return code;
30,240,371✔
1514
}
1515

1516
static int smartContinueIfFail(threadInfo *pThreadInfo,
×
1517
                               SChildTable *childTbl,
1518
                               char *tagData,
1519
                               int64_t i,
1520
                               char *ttl) {
1521
    SDataBase *  database = pThreadInfo->dbInfo;
×
1522
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
1523
    char *buffer =
1524
        benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
×
1525
    (void)snprintf(
×
1526
            buffer, TSDB_MAX_ALLOWED_SQL_LEN,
1527
            g_arguments->escape_character ?
×
1528
                "CREATE TABLE IF NOT EXISTS `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s "
1529
                : "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (%s) %s ",
1530
            database->dbName, childTbl->name, database->dbName,
1531
            stbInfo->stbName,
1532
            tagData + i * stbInfo->lenOfTags, ttl);
×
1533
    debugPrint("creating table: %s\n", buffer);
×
1534

1535
    int ret;
1536
    if (REST_IFACE == stbInfo->iface) {
×
1537
        ret = queryDbExecRest(buffer,
×
1538
                              database->dbName,
1539
                              database->precision,
1540
                              stbInfo->iface,
×
1541
                              stbInfo->lineProtocol,
×
1542
                              stbInfo->tcpTransfer,
×
1543
                              pThreadInfo->sockfd);
1544
    } else {
1545
        ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
1546
        int32_t trying = g_arguments->keep_trying;
×
1547
        while (ret && trying) {
×
1548
            infoPrint("will sleep %"PRIu32" milliseconds then "
×
1549
                      "re-create table %s\n",
1550
                      g_arguments->trying_interval, buffer);
1551
            toolsMsleep(g_arguments->trying_interval);
×
1552
            ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
1553
            if (trying != -1) {
×
1554
                trying--;
×
1555
            }
1556
        }
1557
    }
1558

1559
    tmfree(buffer);
×
1560

1561
    return ret;
×
1562
}
1563

1564
static void cleanupAndPrint(threadInfo *pThreadInfo, char *mode) {
179,424✔
1565
    if (pThreadInfo) {
179,424✔
1566
        if (pThreadInfo->json_array) {
179,424✔
1567
            tools_cJSON_Delete(pThreadInfo->json_array);
4,290✔
1568
            pThreadInfo->json_array = NULL;
4,290✔
1569
        }
1570
        if (0 == pThreadInfo->totalDelay) {
179,473✔
1571
            pThreadInfo->totalDelay = 1;
56✔
1572
        }
1573
        succPrint(
179,424✔
1574
            "thread[%d] %s mode, completed total inserted rows: %" PRIu64
1575
            ", %.2f records/second\n",
1576
            pThreadInfo->threadID,
1577
            mode,
1578
            pThreadInfo->totalInsertRows,
1579
            (double)(pThreadInfo->totalInsertRows /
1580
            ((double)pThreadInfo->totalDelay / 1E6)));
1581
    }
1582
}
179,473✔
1583

1584
static int64_t getDisorderTs(SSuperTable *stbInfo, int *disorderRange) {
2,147,483,647✔
1585
    int64_t disorderTs = 0;
2,147,483,647✔
1586
    int64_t startTimestamp = stbInfo->startTimestamp;
2,147,483,647✔
1587
    if (stbInfo->disorderRatio > 0) {
2,147,483,647✔
1588
        int rand_num = taosRandom() % 100;
16,863,852✔
1589
        if (rand_num < stbInfo->disorderRatio) {
16,745,498✔
1590
            (*disorderRange)--;
3,387,246✔
1591
            if (0 == *disorderRange) {
3,387,518✔
1592
                *disorderRange = stbInfo->disorderRange;
2,516✔
1593
            }
1594
            disorderTs = startTimestamp - *disorderRange;
3,387,008✔
1595
            debugPrint("rand_num: %d, < disorderRatio: %d, "
3,357,428✔
1596
                       "disorderTs: %"PRId64"\n",
1597
                       rand_num, stbInfo->disorderRatio,
1598
                       disorderTs);
1599
        }
1600
    }
1601
    return disorderTs;
2,147,483,647✔
1602
}
1603

1604
void loadChildTableInfo(threadInfo* pThreadInfo) {
179,646✔
1605
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
179,646✔
1606
    if(!g_arguments->pre_load_tb_meta) {
181,003✔
1607
        return ;
180,989✔
1608
    }
1609
    if(pThreadInfo->conn == NULL) {
14✔
1610
        return ;
×
1611
    }
1612

1613
    char *db    = pThreadInfo->dbInfo->dbName;
14✔
1614
    int64_t cnt = pThreadInfo->end_table_to - pThreadInfo->start_table_from;
14✔
1615

1616
    // 100k
1617
    int   bufLen = 100 * 1024;
14✔
1618
    char *buf    = benchCalloc(1, bufLen, false);
14✔
1619
    int   pos    = 0;
×
1620
    infoPrint("start load child tables(%"PRId64") info...\n", cnt);
×
1621
    int64_t start = toolsGetTimestampUs();
×
1622
    for(int64_t i = pThreadInfo->start_table_from; i < pThreadInfo->end_table_to; i++) {
×
1623
        SChildTable *childTbl = stbInfo->childTblArray[i];
×
1624
        pos += sprintf(buf + pos, ",%s.%s", db, childTbl->name);
×
1625

1626
        if(pos >= bufLen - 256 || i + 1 == pThreadInfo->end_table_to) {
×
1627
            taos_load_table_info(pThreadInfo->conn, buf);
×
1628
            pos = 0;
×
1629
        }
1630
    }
1631
    int64_t delay = toolsGetTimestampUs() - start;
×
1632
    infoPrint("end load child tables info. delay=%.2fs\n", delay/1E6);
×
1633
    pThreadInfo->totalDelay += delay;
×
1634

1635
    tmfree(buf);
×
1636
}
1637

1638
// create conn again
1639
int32_t reCreateConn(threadInfo * pThreadInfo) {
×
1640
    // single
1641
    bool single = true;
×
1642
    if (pThreadInfo->dbInfo->superTbls->size > 1) {
×
1643
        single = false;
×
1644
    }
1645

1646
    //
1647
    // retry stmt2 init
1648
    //
1649

1650
    // stmt2 close
1651
    if (pThreadInfo->conn->stmt2) {
×
1652
        taos_stmt2_close(pThreadInfo->conn->stmt2);
×
1653
        pThreadInfo->conn->stmt2 = NULL;
×
1654
    }
1655

1656
    // retry stmt2 init , maybe success
1657
    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
1658
    if (pThreadInfo->conn->stmt2) {
×
1659
        succPrint("%s", "reCreateConn first taos_stmt2_init() success and return.\n");
×
1660
        return 0;
×
1661
    }
1662

1663
    //
1664
    // close old
1665
    //
1666
    closeBenchConn(pThreadInfo->conn);
×
1667
    pThreadInfo->conn = NULL;
×
1668

1669
    //
1670
    // create new
1671
    //
1672

1673
    // conn
1674
    pThreadInfo->conn = initBenchConn(pThreadInfo->dbInfo->dbName);
×
1675
    if (pThreadInfo->conn == NULL) {
×
1676
        errorPrint("%s", "reCreateConn initBenchConn failed.");
×
1677
        return -1;
×
1678
    }
1679
    // stmt2
1680
    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
1681
    if (NULL == pThreadInfo->conn->stmt2) {
×
1682
        errorPrint("reCreateConn taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
×
1683
        return -1;
×
1684
    }
1685

1686
    succPrint("%s", "reCreateConn second taos_stmt2_init() success.\n");
×
1687
    // select db
1688
    if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
×
1689
        errorPrint("second taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
1690
        return -1;
×
1691
    }
1692

1693
    return 0;
×
1694
}
1695

1696
// reinit
1697
int32_t reConnectStmt2(threadInfo * pThreadInfo, int32_t w) {
×
1698
    // re-create connection
1699
    int32_t code = reCreateConn(pThreadInfo);
×
1700
    if (code != 0) {
×
1701
        return code;
×
1702
    }
1703

1704
    // prepare
1705
    code = prepareStmt2(pThreadInfo->conn->stmt2, pThreadInfo->stbInfo, NULL, w, pThreadInfo->dbInfo->dbName);
×
1706
    if (code != 0) {
×
1707
        return code;
×
1708
    }
1709

1710
    return code;
×
1711
}
1712

1713
int32_t submitStmt2Impl(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
924,143✔
1714
                    int64_t* startTs, int64_t* endTs, uint32_t* generated) {
1715
    // call bind
1716
    int64_t start = toolsGetTimestampUs();
924,143✔
1717
    int32_t code = taos_stmt2_bind_param(pThreadInfo->conn->stmt2, bindv, -1);
924,440✔
1718
    if (code != 0) {
924,957✔
1719
        errorPrint("taos_stmt2_bind_param failed, reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
×
1720
        return code;
×
1721
    }
1722
    debugPrint("interlace taos_stmt2_bind_param() ok.  bindv->count=%d \n", bindv->count);
924,957✔
1723
    *delay1 += toolsGetTimestampUs() - start;
924,957✔
1724

1725
    // execute
1726
    *startTs = toolsGetTimestampUs();
924,986✔
1727
    code = execInsert(pThreadInfo, *generated, delay3);
924,897✔
1728
    *endTs = toolsGetTimestampUs();
924,457✔
1729
    return code;
924,112✔
1730
}
1731

1732
int32_t submitStmt2(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
924,325✔
1733
                    int64_t* startTs, int64_t* endTs, uint32_t* generated, int32_t w) {
1734
    // calc loop
1735
    int32_t loop = 1;
924,325✔
1736
    SSuperTable* stbInfo = pThreadInfo->stbInfo;
924,325✔
1737
    if(stbInfo->continueIfFail == YES_IF_FAILED) {
924,325✔
1738
        if(stbInfo->keep_trying > 1) {
140✔
1739
            loop = stbInfo->keep_trying;
140✔
1740
        } else {
1741
            loop = 3; // default
×
1742
        }
1743
    }
1744

1745
    // submit stmt2
1746
    int32_t i = 0;
924,076✔
1747
    bool connected = true;
924,076✔
1748
    while (1) {
×
1749
        int32_t code = -1;
924,076✔
1750
        if(connected) {
924,076✔
1751
            // reinit success to do submit
1752
            code = submitStmt2Impl(pThreadInfo, bindv, delay1, delay3, startTs, endTs, generated);
924,059✔
1753
        }
1754

1755
        // check code
1756
        if ( code == 0) {
924,118✔
1757
            // success
1758
            break;
924,118✔
1759
        } else {
1760
            // failed to try
1761
            if (--loop == 0) {
×
1762
                // failed finally
1763
                char tip[64] = "";
×
1764
                if (i > 0) {
×
1765
                    (void)snprintf(tip, sizeof(tip), " after retry %d", i);
×
1766
                }
1767
                errorPrint("finally faild execute submitStmt2()%s\n", tip);
×
1768
                return -1;
×
1769
            }
1770

1771
            // wait a memont for trying
1772
            toolsMsleep(stbInfo->trying_interval);
×
1773
            // reinit
1774
            infoPrint("stmt2 start retry submit i=%d  after sleep %d ms...\n", i++, stbInfo->trying_interval);
×
1775
            code = reConnectStmt2(pThreadInfo, w);
×
1776
            if (code != 0) {
×
1777
                // faild and try again
1778
                errorPrint("faild reConnectStmt2 and retry again for next i=%d \n", i);
×
1779
                connected = false;
×
1780
            } else {
1781
                // succ
1782
                connected = true;
×
1783
            }
1784
        }
1785
    }
1786

1787
    // success
1788
    return 0;
924,118✔
1789
}
1790

1791
static void *syncWriteInterlace(void *sarg) {
8,185✔
1792
    threadInfo * pThreadInfo = (threadInfo *)sarg;
8,185✔
1793
    SDataBase *  database = pThreadInfo->dbInfo;
8,185✔
1794
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
8,245✔
1795
    infoPrint(
8,245✔
1796
              "thread[%d] start interlace inserting into table from "
1797
              "%" PRIu64 " to %" PRIu64 "\n",
1798
              pThreadInfo->threadID, pThreadInfo->start_table_from,
1799
              pThreadInfo->end_table_to);
1800

1801
    int64_t insertRows = stbInfo->insertRows;
8,245✔
1802
    int32_t interlaceRows = stbInfo->interlaceRows;
8,245✔
1803
    uint32_t nBatchTable  = g_arguments->reqPerReq / interlaceRows;
8,245✔
1804
    uint64_t   lastPrintTime = toolsGetTimestampMs();
8,245✔
1805
    uint64_t   lastTotalInsertRows = 0;
8,245✔
1806
    int64_t   startTs = toolsGetTimestampUs();
8,245✔
1807
    int64_t   endTs;
7,584✔
1808
    uint64_t   tableSeq = pThreadInfo->start_table_from;
8,245✔
1809
    int disorderRange = stbInfo->disorderRange;
8,245✔
1810
    int32_t i = 0;
8,245✔
1811

1812
    loadChildTableInfo(pThreadInfo);
8,245✔
1813
    // check if filling back mode
1814
    bool fillBack = false;
8,245✔
1815
    if(stbInfo->useNow && stbInfo->startFillbackTime) {
8,245✔
1816
        fillBack = true;
×
1817
        pThreadInfo->start_time = stbInfo->startFillbackTime;
×
1818
        infoPrint("start time change to startFillbackTime = %"PRId64" \n", pThreadInfo->start_time);
×
1819
    }
1820

1821
    FILE* csvFile = NULL;
8,245✔
1822
    char* tagData = NULL;
8,245✔
1823
    int   w       = 0; // record tags position, if w > TAG_BATCH_COUNT , need recreate new tag values
8,245✔
1824
    if (stbInfo->autoTblCreating) {
8,245✔
1825
        csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
147✔
1826
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
147✔
1827
    }
1828
    int64_t delay1 = 0;
8,245✔
1829
    int64_t delay2 = 0;
8,245✔
1830
    int64_t delay3 = 0;
8,245✔
1831
    bool    firstInsertTb = true;
8,245✔
1832

1833
    TAOS_STMT2_BINDV *bindv = NULL;
8,245✔
1834

1835
    // create bindv
1836
    if(stbInfo->iface == STMT2_IFACE) {
8,245✔
1837
        int32_t tagCnt = stbInfo->autoTblCreating ? stbInfo->tags->size : 0;
1,834✔
1838
        if (csvFile) {
1,834✔
1839
            tagCnt = 0;
×
1840
        }
1841
        //int32_t tagCnt = stbInfo->tags->size;
1842
        bindv = createBindV(nBatchTable,  tagCnt, stbInfo->cols->size + 1);
1,834✔
1843
    }
1844

1845
    bool oldInitStmt = stbInfo->autoTblCreating;
8,245✔
1846
    // not auto create table call once
1847
    if(stbInfo->iface == STMT_IFACE && !oldInitStmt) {
8,245✔
1848
        debugPrint("call prepareStmt for stable:%s\n", stbInfo->stbName);
1,628✔
1849
        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
1,628✔
1850
            g_fail = true;
×
1851
            goto free_of_interlace;
×
1852
        }
1853
    }
1854
    else if (stbInfo->iface == STMT2_IFACE) {
6,617✔
1855
        // only prepare once
1856
        if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, NULL, w, database->dbName)) {
1,834✔
1857
            g_fail = true;
×
1858
            goto free_of_interlace;
×
1859
        }
1860
    }
1861
    int64_t index = tableSeq;
8,245✔
1862
    while (insertRows > 0) {
2,154,489✔
1863
        int64_t tmp_total_insert_rows = 0;
2,146,188✔
1864
        uint32_t generated = 0;
2,146,188✔
1865
        if (insertRows <= interlaceRows) {
2,147,635✔
1866
            interlaceRows = insertRows;
9,154✔
1867
        }
1868

1869
        // loop each table
1870
        for (i = 0; i < nBatchTable; i++) {
5,688,269✔
1871
            if (g_arguments->terminate) {
5,684,236✔
1872
                goto free_of_interlace;
×
1873
            }
1874
            int64_t pos = pThreadInfo->pos;
5,683,244✔
1875

1876
            // get childTable
1877
            SChildTable *childTbl;
1878
            if (g_arguments->bind_vgroup) {
5,684,769✔
1879
                childTbl = pThreadInfo->vg->childTblArray[tableSeq];
2,984,758✔
1880
            } else {
1881
                childTbl = stbInfo->childTblArray[tableSeq];
2,700,277✔
1882
            }
1883

1884
            char*  tableName   = childTbl->name;
5,685,539✔
1885
            char *sampleDataBuf = childTbl->useOwnSample?
6,272,058✔
1886
                                        childTbl->sampleDataBuf:
5,685,788✔
1887
                                        stbInfo->sampleDataBuf;
1888
            // init ts
1889
            if(childTbl->ts == 0) {
5,686,432✔
1890
               childTbl->ts = pThreadInfo->start_time;
49,326✔
1891
            }
1892
            char ttl[SMALL_BUFF_LEN] = "";
5,686,120✔
1893
            if (stbInfo->ttl != 0) {
5,684,205✔
1894
                (void)snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
1,568✔
1895
            }
1896
            switch (stbInfo->iface) {
5,681,388✔
1897
                case REST_IFACE:
1,200,721✔
1898
                case TAOSC_IFACE: {
1899
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
1,200,721✔
1900
                    if (g_arguments->escape_character) {
1,200,786✔
1901
                        (void)snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "`%s`",
298,322✔
1902
                                tableName);
1903
                    } else {
1904
                        (void)snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "%s",
901,484✔
1905
                                tableName);
1906
                    }
1907
                    if (i == 0) {
1,199,806✔
1908
                        (void)ds_add_str(&pThreadInfo->buffer, STR_INSERT_INTO);
766,564✔
1909
                    }
1910

1911
                    // generator
1912
                    if (stbInfo->autoTblCreating && w == 0) {
1,199,746✔
1913
                        if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
49✔
1914
                            goto free_of_interlace;
×
1915
                        }
1916
                    }
1917

1918
                    // create child table
1919
                    if (stbInfo->partialColNum == stbInfo->cols->size) {
1,201,497✔
1920
                        if (stbInfo->autoTblCreating) {
1,199,447✔
1921
                            ds_add_strs(&pThreadInfo->buffer, 8,
×
1922
                                    escapedTbName,
1923
                                    " USING `",
1924
                                    stbInfo->stbName,
1925
                                    "` TAGS (",
1926
                                    tagData + stbInfo->lenOfTags * w,
×
1927
                                    ") ", ttl, " VALUES ");
1928
                        } else {
1929
                            ds_add_strs(&pThreadInfo->buffer, 2,
1,199,923✔
1930
                                    escapedTbName, " VALUES ");
1931
                        }
1932
                    } else {
1933
                        if (stbInfo->autoTblCreating) {
1,568✔
1934
                            ds_add_strs(&pThreadInfo->buffer, 10,
1,568✔
1935
                                        escapedTbName,
1936
                                        " (",
1937
                                        stbInfo->partialColNameBuf,
1938
                                        ") USING `",
1939
                                        stbInfo->stbName,
1940
                                        "` TAGS (",
1941
                                        tagData + stbInfo->lenOfTags * w,
1,568✔
1942
                                        ") ", ttl, " VALUES ");
1943
                        } else {
1944
                            ds_add_strs(&pThreadInfo->buffer, 4,
×
1945
                                        escapedTbName,
1946
                                        "(",
1947
                                        stbInfo->partialColNameBuf,
1948
                                        ") VALUES ");
1949
                        }
1950
                    }
1951

1952
                    // move next
1953
                    if (stbInfo->autoTblCreating && ++w >= TAG_BATCH_COUNT) {
1,201,832✔
1954
                        // reset for gen again
1955
                        w = 0;
×
1956
                        index += TAG_BATCH_COUNT;
×
1957
                    }
1958

1959
                    // write child data with interlaceRows
1960
                    for (int64_t j = 0; j < interlaceRows; j++) {
4,475,282✔
1961
                        int64_t disorderTs = getDisorderTs(stbInfo,
3,273,999✔
1962
                                &disorderRange);
1963

1964
                        // change fillBack mode with condition
1965
                        if(fillBack) {
3,273,264✔
1966
                            int64_t tsnow = toolsGetTimestamp(database->precision);
×
1967
                            if(childTbl->ts >= tsnow){
×
1968
                                fillBack = false;
×
1969
                                infoPrint("fillBack mode set end. because timestamp(%"PRId64") >= now(%"PRId64")\n", childTbl->ts, tsnow);
×
1970
                            }
1971
                        }
1972

1973
                        // timestamp
1974
                        char time_string[BIGINT_BUFF_LEN];
3,268,025✔
1975
                        if(stbInfo->useNow && stbInfo->interlaceRows == 1 && !fillBack) {
3,274,045✔
1976
                            int64_t now = toolsGetTimestamp(database->precision);
39✔
1977
                            (void)snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"", now);
39✔
1978
                        } else {
1979
                            (void)snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"",
3,274,640✔
1980
                                    disorderTs?disorderTs:childTbl->ts);
1981
                        }
1982

1983
                        // combine rows timestamp | other cols = sampleDataBuf[pos]
1984
                        if(stbInfo->useSampleTs) {
3,274,842✔
1985
                            ds_add_strs(&pThreadInfo->buffer, 3, "(",
×
1986
                                        sampleDataBuf + pos * stbInfo->lenOfCols, ") ");
×
1987
                        } else {
1988
                            ds_add_strs(&pThreadInfo->buffer, 5, "(", time_string, ",",
3,273,911✔
1989
                                        sampleDataBuf + pos * stbInfo->lenOfCols, ") ");
3,275,251✔
1990
                        }
1991
                        // check buffer enough
1992
                        if (ds_len(pThreadInfo->buffer)
3,274,992✔
1993
                                > stbInfo->max_sql_len) {
3,274,812✔
1994
                            errorPrint("sql buffer length (%"PRIu64") "
×
1995
                                    "is larger than max sql length "
1996
                                    "(%"PRId64")\n",
1997
                                    ds_len(pThreadInfo->buffer),
1998
                                    stbInfo->max_sql_len);
1999
                            goto free_of_interlace;
×
2000
                        }
2001

2002
                        // move next
2003
                        generated++;
3,273,930✔
2004
                        pos++;
3,273,930✔
2005
                        if (pos >= g_arguments->prepared_rand) {
3,273,930✔
2006
                            pos = 0;
1,812✔
2007
                        }
2008
                        if(stbInfo->primary_key)
3,274,106✔
2009
                            debugPrint("add child=%s %"PRId64" pk cur=%d cnt=%d \n", childTbl->name, childTbl->ts, childTbl->pkCur, childTbl->pkCnt);
×
2010

2011
                        // primary key
2012
                        if (!stbInfo->primary_key || needChangeTs(stbInfo, &childTbl->pkCur, &childTbl->pkCnt)) {
3,273,806✔
2013
                            childTbl->ts += stbInfo->timestamp_step;
3,273,926✔
2014
                            if(stbInfo->primary_key)
3,273,994✔
2015
                                debugPrint("changedTs child=%s %"PRId64" pk cur=%d cnt=%d \n", childTbl->name, childTbl->ts, childTbl->pkCur, childTbl->pkCnt);
×
2016
                        }
2017

2018
                    }
2019
                    break;
1,201,283✔
2020
                }
2021
                case STMT_IFACE: {
2,958,990✔
2022
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
2,958,990✔
2023
                    if (g_arguments->escape_character) {
2,959,422✔
2024
                        (void)snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2,
110,435✔
2025
                                "`%s`", tableName);
2026
                    } else {
2027
                        (void)snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
2,849,367✔
2028
                                tableName);
2029
                    }
2030

2031
                    // generator
2032
                    if (stbInfo->autoTblCreating && w == 0) {
2,959,802✔
2033
                        if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
2034
                            goto free_of_interlace;
×
2035
                        }
2036
                    }
2037

2038
                    // old must call prepareStmt for each table
2039
                    if (oldInitStmt) {
2,960,916✔
2040
                        debugPrint("call prepareStmt for stable:%s\n", stbInfo->stbName);
×
2041
                        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
×
2042
                            g_fail = true;
×
2043
                            goto free_of_interlace;
×
2044
                        }
2045
                    }
2046

2047
                    int64_t start = toolsGetTimestampUs();
2,960,916✔
2048
                    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
2,959,278✔
2049
                                             escapedTbName)) {
2050
                        errorPrint(
×
2051
                            "taos_stmt_set_tbname(%s) failed, reason: %s\n",
2052
                            tableName,
2053
                                taos_stmt_errstr(pThreadInfo->conn->stmt));
2054
                        g_fail = true;
×
2055
                        goto free_of_interlace;
×
2056
                    }
2057
                    delay1 += toolsGetTimestampUs() - start;
2,960,633✔
2058

2059
                    int32_t n = 0;
2,960,694✔
2060
                    generated += bindParamBatch(pThreadInfo, interlaceRows,
2,961,855✔
2061
                                       childTbl->ts, pos, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n, &delay2, &delay3);
2062

2063
                    // move next
2064
                    pos += interlaceRows;
2,960,519✔
2065
                    if (pos + interlaceRows + 1 >= g_arguments->prepared_rand) {
2,960,519✔
2066
                        pos = 0;
1,105✔
2067
                    }
2068
                    childTbl->ts += stbInfo->timestamp_step * n;
2,960,967✔
2069

2070
                    // move next
2071
                    if (stbInfo->autoTblCreating) {
2,961,375✔
2072
                        w += 1;
×
2073
                        if (w >= TAG_BATCH_COUNT) {
×
2074
                            // reset for gen again
2075
                            w = 0;
×
2076
                            index += TAG_BATCH_COUNT;
×
2077
                        }
2078
                    }
2079

2080
                    break;
2,960,551✔
2081
                }
2082
                case STMT2_IFACE: {
1,419,051✔
2083
                    // tbnames
2084
                    bindv->tbnames[i] = childTbl->name;
1,419,051✔
2085

2086
                    // tags
2087
                    if (stbInfo->autoTblCreating && firstInsertTb) {
1,419,721✔
2088
                        // create
2089
                        if (w == 0) {
280✔
2090
                            // recreate sample tags
2091
                            if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, pThreadInfo->tagsStmt, index)) {
42✔
2092
                                goto free_of_interlace;
×
2093
                            }
2094
                        }
2095

2096
                        if (csvFile) {
280✔
2097
                            if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
×
2098
                                g_fail = true;
×
2099
                                goto free_of_interlace;
×
2100
                            }
2101
                        }
2102

2103
                        bindVTags(bindv, i, w, pThreadInfo->tagsStmt);
280✔
2104
                    }
2105

2106
                    // cols
2107
                    int32_t n = 0;
1,419,515✔
2108
                    generated += bindVColsInterlace(bindv, i, pThreadInfo, interlaceRows, childTbl->ts, pos,
1,419,558✔
2109
                                                    childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n);
2110
                    // move next
2111
                    pos += interlaceRows;
1,419,008✔
2112
                    if (pos + interlaceRows + 1 >= g_arguments->prepared_rand) {
1,419,008✔
2113
                        pos = 0;
4,100✔
2114
                    }
2115
                    childTbl->ts += stbInfo->timestamp_step * n;
1,419,420✔
2116
                    if (stbInfo->autoTblCreating) {
1,419,274✔
2117
                        w += 1;
565,348✔
2118
                        if (w >= TAG_BATCH_COUNT) {
565,348✔
2119
                            // reset for gen again
2120
                            w = 0;
5,656✔
2121
                            index += TAG_BATCH_COUNT;
5,656✔
2122
                        }
2123
                    }
2124

2125
                    break;
1,419,240✔
2126
                }
2127
                case SML_REST_IFACE:
104,169✔
2128
                case SML_IFACE: {
2129
                    int protocol = stbInfo->lineProtocol;
104,169✔
2130
                    for (int64_t j = 0; j < interlaceRows; j++) {
221,962✔
2131
                        int64_t disorderTs = getDisorderTs(stbInfo,
117,842✔
2132
                                &disorderRange);
2133
                        if (TSDB_SML_JSON_PROTOCOL == protocol) {
117,842✔
2134
                            tools_cJSON *tag = tools_cJSON_Duplicate(
6,720✔
2135
                                tools_cJSON_GetArrayItem(
6,720✔
2136
                                    pThreadInfo->sml_json_tags,
6,720✔
2137
                                    (int)tableSeq -
6,720✔
2138
                                        pThreadInfo->start_table_from),
6,720✔
2139
                                    true);
2140
                            generateSmlJsonCols(
6,720✔
2141
                                pThreadInfo->json_array, tag, stbInfo,
2142
                                database->sml_precision,
6,720✔
2143
                                    disorderTs?disorderTs:childTbl->ts);
2144
                        } else if (SML_JSON_TAOS_FORMAT == protocol) {
111,122✔
2145
                            tools_cJSON *tag = tools_cJSON_Duplicate(
×
2146
                                tools_cJSON_GetArrayItem(
×
2147
                                    pThreadInfo->sml_json_tags,
×
2148
                                    (int)tableSeq -
×
2149
                                        pThreadInfo->start_table_from),
×
2150
                                    true);
2151
                            generateSmlTaosJsonCols(
×
2152
                                pThreadInfo->json_array, tag, stbInfo,
2153
                                database->sml_precision,
×
2154
                                disorderTs?disorderTs:childTbl->ts);
2155
                        } else if (TSDB_SML_LINE_PROTOCOL == protocol) {
111,122✔
2156
                            (void)snprintf(
204,373✔
2157
                                pThreadInfo->lines[generated],
104,451✔
2158
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
104,353✔
2159
                                "%s %s %" PRId64,
2160
                                pThreadInfo
2161
                                    ->sml_tags[(int)tableSeq -
204,324✔
2162
                                               pThreadInfo->start_table_from],
104,402✔
2163
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
104,402✔
2164
                                disorderTs?disorderTs:childTbl->ts);
2165
                        } else {
2166
                            (void)snprintf(
6,720✔
2167
                                pThreadInfo->lines[generated],
6,720✔
2168
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
6,720✔
2169
                                "%s %" PRId64 " %s %s", stbInfo->stbName,
2170
                                disorderTs?disorderTs:childTbl->ts,
2171
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
6,720✔
2172
                                pThreadInfo
2173
                                    ->sml_tags[(int)tableSeq -
6,720✔
2174
                                               pThreadInfo->start_table_from]);
6,720✔
2175
                        }
2176
                        generated++;
117,891✔
2177
                        // primary key
2178
                        if (!stbInfo->primary_key || needChangeTs(stbInfo, &childTbl->pkCur, &childTbl->pkCnt)) {
117,891✔
2179
                            childTbl->ts += stbInfo->timestamp_step;
117,793✔
2180
                        }
2181
                    }
2182
                    if (TSDB_SML_JSON_PROTOCOL == protocol
104,120✔
2183
                            || SML_JSON_TAOS_FORMAT == protocol) {
101,768✔
2184
                        pThreadInfo->lines[0] =
2,352✔
2185
                            tools_cJSON_PrintUnformatted(
2,352✔
2186
                                pThreadInfo->json_array);
2,401✔
2187
                    }
2188
                    break;
104,071✔
2189
                }
2190
            }
2191

2192
            // move to next table in one batch
2193
            tableSeq++;
5,687,558✔
2194
            tmp_total_insert_rows += interlaceRows;
5,687,558✔
2195
            if (tableSeq > pThreadInfo->end_table_to) {
5,687,558✔
2196
                // first insert tables loop is end
2197
                firstInsertTb = false;
2,145,144✔
2198
                // one tables loop timestamp and pos add
2199
                tableSeq = pThreadInfo->start_table_from;
2,145,144✔
2200
                // save
2201
                pThreadInfo->pos = pos;
2,145,253✔
2202
                if (!stbInfo->non_stop) {
2,145,020✔
2203
                    insertRows -= interlaceRows;
2,144,836✔
2204
                }
2205

2206
                // if fillBack mode , can't sleep
2207
                if (stbInfo->insert_interval > 0 && !fillBack) {
2,145,403✔
2208
                    debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
24,298✔
2209
                          __func__, __LINE__, stbInfo->insert_interval);
2210
                    perfPrint("sleep %" PRIu64 " ms\n",
24,298✔
2211
                                     stbInfo->insert_interval);
2212
                    toolsMsleep((int32_t)stbInfo->insert_interval);
24,298✔
2213
                }
2214

2215
                i++;
2,144,547✔
2216
                // rectify bind count
2217
                if (bindv && bindv->count != i) {
2,144,547✔
2218
                    bindv->count = i;
×
2219
                }
2220
                break;
2,144,590✔
2221
            }
2222
        }
2223

2224
        // exec
2225
        if(stbInfo->iface == STMT2_IFACE) {
2,148,734✔
2226
            // exec stmt2
2227
            if(g_arguments->debug_print)
673,051✔
2228
                showBindV(bindv, stbInfo->tags, stbInfo->cols);
×
2229
            // bind & exec stmt2
2230
            if (submitStmt2(pThreadInfo, bindv, &delay1, &delay3, &startTs, &endTs, &generated, w) != 0) {
673,137✔
2231
                g_fail = true;
×
2232
                goto free_of_interlace;
×
2233
            }
2234
        } else {
2235
            // exec other
2236
            startTs = toolsGetTimestampUs();
1,475,808✔
2237
            if (execInsert(pThreadInfo, generated, &delay3)) {
1,476,541✔
2238
                g_fail = true;
×
2239
                goto free_of_interlace;
×
2240
            }
2241
            endTs = toolsGetTimestampUs();
1,476,418✔
2242
        }
2243

2244
        debugPrint("execInsert tableIndex=%d left insert rows=%"PRId64" generated=%d\n", i, insertRows, generated);
2,148,883✔
2245

2246
        // reset count
2247
        if(bindv) {
2,149,748✔
2248
            bindv->count = 0;
673,003✔
2249
        }
2250

2251
        pThreadInfo->totalInsertRows += tmp_total_insert_rows;
2,149,705✔
2252

2253
        if (g_arguments->terminate) {
2,149,280✔
2254
            goto free_of_interlace;
×
2255
        }
2256

2257
        int protocol = stbInfo->lineProtocol;
2,149,172✔
2258
        switch (stbInfo->iface) {
2,149,123✔
2259
            case TAOSC_IFACE:
766,394✔
2260
            case REST_IFACE:
2261
                debugPrint("pThreadInfo->buffer: %s\n",
766,394✔
2262
                           pThreadInfo->buffer);
2263
                free_ds(&pThreadInfo->buffer);
766,334✔
2264
                pThreadInfo->buffer = new_ds(0);
766,504✔
2265
                break;
766,807✔
2266
            case SML_REST_IFACE:
1,288✔
2267
                memset(pThreadInfo->buffer, 0,
1,288✔
2268
                       g_arguments->reqPerReq * (pThreadInfo->max_sql_len + 1));
1,288✔
2269
            case SML_IFACE:
8,478✔
2270
                if (TSDB_SML_JSON_PROTOCOL == protocol
8,478✔
2271
                        || SML_JSON_TAOS_FORMAT == protocol) {
6,381✔
2272
                    debugPrint("pThreadInfo->lines[0]: %s\n",
2,097✔
2273
                               pThreadInfo->lines[0]);
2274
                    if (pThreadInfo->json_array && !g_arguments->terminate) {
1,204✔
2275
                        tools_cJSON_Delete(pThreadInfo->json_array);
1,162✔
2276
                        pThreadInfo->json_array = NULL;
1,176✔
2277
                    }
2278
                    pThreadInfo->json_array = tools_cJSON_CreateArray();
1,218✔
2279
                    if (pThreadInfo->lines && pThreadInfo->lines[0]) {
1,176✔
2280
                        tmfree(pThreadInfo->lines[0]);
1,176✔
2281
                        pThreadInfo->lines[0] = NULL;
1,176✔
2282
                    }
2283
                } else {
2284
                    for (int j = 0; j < generated; j++) {
117,111✔
2285
                        if (pThreadInfo && pThreadInfo->lines
110,681✔
2286
                                && !g_arguments->terminate) {
110,779✔
2287
                            debugPrint("pThreadInfo->lines[%d]: %s\n", j,
110,779✔
2288
                                       pThreadInfo->lines[j]);
2289
                            memset(pThreadInfo->lines[j], 0,
110,779✔
2290
                                   pThreadInfo->max_sql_len);
2291
                        }
2292
                    }
2293
                }
2294
                break;
7,606✔
2295
            case STMT_IFACE:
702,264✔
2296
                break;
702,264✔
2297
        }
2298

2299
        int64_t delay4 = endTs - startTs;
2,148,769✔
2300
        int64_t delay = delay1 + delay2 + delay3 + delay4;
2,148,769✔
2301
        if (delay <=0) {
2,148,769✔
2302
            debugPrint("thread[%d]: startTS: %"PRId64", endTS: %"PRId64"\n",
×
2303
                       pThreadInfo->threadID, startTs, endTs);
2304
        } else {
2305
            perfPrint("insert execution time is %10.2f ms\n",
2,148,769✔
2306
                      delay / 1E6);
2307

2308
            int64_t * pdelay = benchCalloc(1, sizeof(int64_t), false);
2,149,270✔
2309
            *pdelay = delay;
2,149,118✔
2310
            if (benchArrayPush(pThreadInfo->delayList, pdelay) == NULL) {
2,149,637✔
2311
                tmfree(pdelay);
×
2312
            }
2313
            pThreadInfo->totalDelay += delay;
2,148,333✔
2314
            pThreadInfo->totalDelay1 += delay1;
2,148,704✔
2315
            pThreadInfo->totalDelay2 += delay2;
2,147,638✔
2316
            pThreadInfo->totalDelay3 += delay3;
2,147,363✔
2317
        }
2318
        delay1 = delay2 = delay3 = 0;
2,148,587✔
2319

2320
        int64_t currentPrintTime = toolsGetTimestampMs();
2,148,587✔
2321
        if (currentPrintTime - lastPrintTime > 30 * 1000) {
2,148,575✔
2322
            infoPrint(
×
2323
                    "thread[%d] has currently inserted rows: %" PRIu64
2324
                    ", peroid insert rate: %.3f rows/s \n",
2325
                    pThreadInfo->threadID, pThreadInfo->totalInsertRows,
2326
                    (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
2327
            lastPrintTime = currentPrintTime;
×
2328
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
×
2329
        }
2330
    }
2331

2332
free_of_interlace:
10,346✔
2333
    cleanupAndPrint(pThreadInfo, "interlace");
8,301✔
2334
    if(csvFile) {
8,245✔
2335
        fclose(csvFile);
×
2336
    }
2337
    tmfree(tagData);
8,245✔
2338
    freeBindV(bindv);
8,245✔
2339
    return NULL;
8,245✔
2340
}
2341

2342
static int32_t prepareProgressDataStmt(
6,973,361✔
2343
        threadInfo *pThreadInfo,
2344
        SChildTable *childTbl,
2345
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1, int64_t *delay2, int64_t *delay3) {
2346
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
6,973,361✔
2347
    char escapedTbName[TSDB_TABLE_NAME_LEN + 2] = "\0";
6,973,713✔
2348
    if (g_arguments->escape_character) {
6,972,999✔
2349
        (void)snprintf(escapedTbName, TSDB_TABLE_NAME_LEN + 2,
6,948,860✔
2350
                 "`%s`", childTbl->name);
2351
    } else {
2352
        (void)snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
24,540✔
2353
                 childTbl->name);
2354
    }
2355
    int64_t start = toolsGetTimestampUs();
6,973,400✔
2356
    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
6,972,734✔
2357
                             escapedTbName)) {
2358
        errorPrint(
×
2359
                "taos_stmt_set_tbname(%s) failed,"
2360
                "reason: %s\n", escapedTbName,
2361
                taos_stmt_errstr(pThreadInfo->conn->stmt));
2362
        return -1;
×
2363
    }
2364
    *delay1 = toolsGetTimestampUs() - start;
6,957,754✔
2365
    int32_t n = 0;
6,962,954✔
2366
    int64_t pos = i % g_arguments->prepared_rand;
6,940,012✔
2367
    if (g_arguments->prepared_rand - pos < g_arguments->reqPerReq) {
6,951,459✔
2368
        // remain prepare data less than batch, reset pos to zero
2369
        pos = 0;
×
2370
    }
2371
    int32_t generated = bindParamBatch(
6,980,974✔
2372
            pThreadInfo,
2373
            (g_arguments->reqPerReq > (stbInfo->insertRows - i))
6,962,954✔
2374
                ? (stbInfo->insertRows - i)
2375
                : g_arguments->reqPerReq,
6,962,161✔
2376
            *timestamp, pos, childTbl, pkCur, pkCnt, &n, delay2, delay3);
2377
    *timestamp += n * stbInfo->timestamp_step;
6,967,896✔
2378
    return generated;
6,967,564✔
2379
}
2380

2381
static void makeTimestampDisorder(
2,240✔
2382
        int64_t *timestamp, SSuperTable *stbInfo) {
2383
    int64_t startTimestamp = stbInfo->startTimestamp;
2,240✔
2384
    int disorderRange = stbInfo->disorderRange;
2,240✔
2385
    int rand_num = taosRandom() % 100;
2,240✔
2386
    if (rand_num < stbInfo->disorderRatio) {
2,240✔
2387
        disorderRange--;
70✔
2388
        if (0 == disorderRange) {
70✔
2389
            disorderRange = stbInfo->disorderRange;
×
2390
        }
2391
        *timestamp = startTimestamp - disorderRange;
70✔
2392
        debugPrint("rand_num: %d, < disorderRatio: %d"
70✔
2393
                   ", ts: %"PRId64"\n",
2394
                   rand_num,
2395
                   stbInfo->disorderRatio,
2396
                   *timestamp);
2397
    }
2398
}
2,240✔
2399

2400
static int32_t prepareProgressDataSmlJsonText(
9,045✔
2401
    threadInfo *pThreadInfo,
2402
    uint64_t tableSeq,
2403
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2404
    // prepareProgressDataSmlJsonText
2405
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
9,045✔
2406
    int32_t generated = 0;
9,045✔
2407

2408
    int len = 0;
9,045✔
2409

2410
    char *line = pThreadInfo->lines[0];
9,045✔
2411
    uint32_t line_buf_len = pThreadInfo->line_buf_len;
9,045✔
2412

2413
    strncat(line + len, "[", 2);
9,045✔
2414
    len += 1;
9,045✔
2415

2416
    int32_t pos = 0;
9,045✔
2417
    for (int j = 0; (j < g_arguments->reqPerReq)
13,989✔
2418
            && !g_arguments->terminate; j++) {
57,349✔
2419
        strncat(line + len, "{", 2);
50,403✔
2420
        len += 1;
50,403✔
2421
        int n;
2422
        n = snprintf(line + len, line_buf_len - len,
50,403✔
2423
                 "\"timestamp\":%"PRId64",", *timestamp);
2424
        if (n < 0 || n >= line_buf_len - len) {
50,403✔
2425
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2426
                       __func__, __LINE__, j);
2427
            return -1;
×
2428
        } else {
2429
            len += n;
50,403✔
2430
        }
2431

2432
        n = snprintf(line + len, line_buf_len - len, "%s",
50,403✔
2433
                        pThreadInfo->sml_json_value_array[tableSeq]);
50,403✔
2434
        if (n < 0 || n >= line_buf_len - len) {
50,403✔
2435
            errorPrint("%s() LN%d snprintf overflow on %d\n",
28✔
2436
                       __func__, __LINE__, j);
2437
            return -1;
×
2438
        } else {
2439
            len += n;
50,375✔
2440
        }
2441
        n = snprintf(line + len, line_buf_len - len, "\"tags\":%s,",
50,375✔
2442
                       pThreadInfo->sml_tags_json_array[tableSeq]);
50,375✔
2443
        if (n < 0 || n >= line_buf_len - len) {
50,375✔
UNCOV
2444
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2445
                       __func__, __LINE__, j);
2446
            return -1;
×
2447
        } else {
2448
            len += n;
50,375✔
2449
        }
2450
        n = snprintf(line + len, line_buf_len - len,
50,375✔
2451
                       "\"metric\":\"%s\"}", stbInfo->stbName);
2452
        if (n < 0 || n >= line_buf_len - len) {
50,375✔
2453
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2454
                       __func__, __LINE__, j);
2455
            return -1;
×
2456
        } else {
2457
            len += n;
50,403✔
2458
        }
2459

2460
        pos++;
50,403✔
2461
        if (pos >= g_arguments->prepared_rand) {
50,403✔
2462
            pos = 0;
4,046✔
2463
        }
2464

2465
        // primay key repeat ts count
2466
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
50,403✔
2467
            *timestamp += stbInfo->timestamp_step;
50,403✔
2468
        }
2469

2470
        if (stbInfo->disorderRatio > 0) {
50,403✔
2471
            makeTimestampDisorder(timestamp, stbInfo);
×
2472
        }
2473
        generated++;
50,389✔
2474
        if (i + generated >= stbInfo->insertRows) {
50,389✔
2475
            break;
7,029✔
2476
        }
2477
        if ((j+1) < g_arguments->reqPerReq) {
43,360✔
2478
            strncat(line + len, ",", 2);
41,344✔
2479
            len += 1;
41,344✔
2480
        }
2481
    }
2482
    strncat(line + len, "]", 2);
9,031✔
2483

2484
    debugPrint("%s() LN%d, lines[0]: %s\n",
9,031✔
2485
               __func__, __LINE__, pThreadInfo->lines[0]);
2486
    return generated;
9,045✔
2487
}
2488

2489
static int32_t prepareProgressDataSmlJson(
2,085✔
2490
    threadInfo *pThreadInfo,
2491
    uint64_t tableSeq,
2492
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2493
    // prepareProgressDataSmlJson
2494
    SDataBase *  database = pThreadInfo->dbInfo;
2,085✔
2495
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
2,085✔
2496
    int32_t generated = 0;
2,085✔
2497

2498
    int32_t pos = 0;
2,085✔
2499
    for (int j = 0; (j < g_arguments->reqPerReq)
2,085✔
2500
            && !g_arguments->terminate; j++) {
21,363✔
2501
        tools_cJSON *tag = tools_cJSON_Duplicate(
20,355✔
2502
                tools_cJSON_GetArrayItem(
20,396✔
2503
                    pThreadInfo->sml_json_tags,
20,355✔
2504
                    (int)tableSeq -
20,396✔
2505
                    pThreadInfo->start_table_from),
20,355✔
2506
                true);
2507
        debugPrintJsonNoTime(tag);
20,355✔
2508
        generateSmlTaosJsonCols(
20,396✔
2509
                pThreadInfo->json_array, tag, stbInfo,
2510
                database->sml_precision, *timestamp);
20,355✔
2511
        pos++;
20,355✔
2512
        if (pos >= g_arguments->prepared_rand) {
20,355✔
2513
            pos = 0;
2,030✔
2514
        }
2515

2516
        // primay key repeat ts count
2517
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
20,355✔
2518
            *timestamp += stbInfo->timestamp_step;
20,355✔
2519
        }
2520

2521
        if (stbInfo->disorderRatio > 0) {
20,355✔
2522
            makeTimestampDisorder(timestamp, stbInfo);
×
2523
        }
2524
        generated++;
20,355✔
2525
        if (i + generated >= stbInfo->insertRows) {
20,355✔
2526
            break;
1,077✔
2527
        }
2528
    }
2529

2530
    tmfree(pThreadInfo->lines[0]);
2,085✔
2531
    pThreadInfo->lines[0] = NULL;
2,085✔
2532
    pThreadInfo->lines[0] =
4,170✔
2533
            tools_cJSON_PrintUnformatted(
2,085✔
2534
                pThreadInfo->json_array);
2,085✔
2535
    debugPrint("pThreadInfo->lines[0]: %s\n",
2,085✔
2536
                   pThreadInfo->lines[0]);
2537

2538
    return generated;
2,085✔
2539
}
2540

2541
static int32_t prepareProgressDataSmlLineOrTelnet(
24,243✔
2542
    threadInfo *pThreadInfo, uint64_t tableSeq, char *sampleDataBuf,
2543
    int64_t *timestamp, uint64_t i, char *ttl, int protocol, int32_t *pkCur, int32_t *pkCnt) {
2544
    // prepareProgressDataSmlLine
2545
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
24,243✔
2546
    int32_t generated = 0;
24,243✔
2547

2548
    int32_t pos = 0;
24,243✔
2549
    for (int j = 0; (j < g_arguments->reqPerReq)
3,947,402✔
2550
            && !g_arguments->terminate; j++) {
7,908,667✔
2551
        // table index
2552
        int ti = tableSeq - pThreadInfo->start_table_from;
3,984,069✔
2553
        if (TSDB_SML_LINE_PROTOCOL == protocol) {
3,984,381✔
2554
            (void)snprintf(
6,798,253✔
2555
                    pThreadInfo->lines[j],
3,952,062✔
2556
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
3,942,526✔
2557
                    "%s %s %" PRId64,
2558
                    pThreadInfo->sml_tags[ti],
3,941,473✔
2559
                    sampleDataBuf + pos * stbInfo->lenOfCols,
3,941,093✔
2560
                    *timestamp);
2561
        } else {
2562
            (void)snprintf(
53,530✔
2563
                    pThreadInfo->lines[j],
43,600✔
2564
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
43,600✔
2565
                    "%s %" PRId64 " %s %s", stbInfo->stbName,
2566
                    *timestamp,
2567
                    sampleDataBuf
2568
                    + pos * stbInfo->lenOfCols,
43,600✔
2569
                    pThreadInfo->sml_tags[ti]);
43,600✔
2570
        }
2571
        //infoPrint("sml prepare j=%d stb=%s sml_tags=%s \n", j, stbInfo->stbName, pThreadInfo->sml_tags[ti]);
2572
        pos++;
2,907,806✔
2573
        if (pos >= g_arguments->prepared_rand) {
2,907,806✔
2574
            pos = 0;
3,808✔
2575
        }
2576
        // primay key repeat ts count
2577
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
3,981,320✔
2578
            *timestamp += stbInfo->timestamp_step;
3,980,300✔
2579
        }
2580

2581
        if (stbInfo->disorderRatio > 0) {
3,982,480✔
2582
            makeTimestampDisorder(timestamp, stbInfo);
2,240✔
2583
        }
2584
        generated++;
3,984,204✔
2585
        if (i + generated >= stbInfo->insertRows) {
3,984,204✔
2586
            break;
22,276✔
2587
        }
2588
    }
2589
    return generated;
24,222✔
2590
}
2591

2592
static int32_t prepareProgressDataSml(
35,422✔
2593
    threadInfo *pThreadInfo,
2594
    SChildTable *childTbl,
2595
    uint64_t tableSeq,
2596
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2597
    // prepareProgressDataSml
2598
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
35,422✔
2599

2600
    char *sampleDataBuf;
2601
    if (childTbl->useOwnSample) {
35,422✔
2602
        sampleDataBuf = childTbl->sampleDataBuf;
×
2603
    } else {
2604
        sampleDataBuf = stbInfo->sampleDataBuf;
35,373✔
2605
    }
2606
    int protocol = stbInfo->lineProtocol;
35,422✔
2607
    int32_t generated = -1;
35,373✔
2608
    switch (protocol) {
35,373✔
2609
        case TSDB_SML_LINE_PROTOCOL:
24,243✔
2610
        case TSDB_SML_TELNET_PROTOCOL:
2611
            generated = prepareProgressDataSmlLineOrTelnet(
24,243✔
2612
                    pThreadInfo,
2613
                    tableSeq,
2614
                    sampleDataBuf,
2615
                    timestamp, i, ttl, protocol, pkCur, pkCnt);
2616
            break;
24,292✔
2617
        case TSDB_SML_JSON_PROTOCOL:
9,045✔
2618
            generated = prepareProgressDataSmlJsonText(
9,045✔
2619
                    pThreadInfo,
2620
                    tableSeq - pThreadInfo->start_table_from,
9,045✔
2621
                timestamp, i, ttl, pkCur, pkCnt);
2622
            break;
9,045✔
2623
        case SML_JSON_TAOS_FORMAT:
2,085✔
2624
            generated = prepareProgressDataSmlJson(
2,085✔
2625
                    pThreadInfo,
2626
                    tableSeq,
2627
                    timestamp, i, ttl, pkCur, pkCnt);
2628
            break;
2,085✔
2629
        default:
×
2630
            errorPrint("%s() LN%d: unknown protcolor: %d\n",
×
2631
                       __func__, __LINE__, protocol);
2632
            break;
×
2633
    }
2634

2635
    return generated;
35,422✔
2636
}
2637

2638
// if return true, timestmap must add timestap_step, else timestamp no need changed
2639
bool needChangeTs(SSuperTable * stbInfo, int32_t *pkCur, int32_t *pkCnt) {
419,454✔
2640
    // check need generate cnt
2641
    if(*pkCnt == 0) {
419,454✔
2642
        if (stbInfo->repeat_ts_min >= stbInfo->repeat_ts_max) {
42,000✔
2643
            // fixed count value is max
2644
            if (stbInfo->repeat_ts_max == 0){
42,000✔
2645
                return true;
×
2646
            }
2647

2648
            *pkCnt = stbInfo->repeat_ts_max;
42,000✔
2649
        } else {
2650
            // random range
2651
            *pkCnt = RD(stbInfo->repeat_ts_max + 1);
×
2652
            if(*pkCnt < stbInfo->repeat_ts_min) {
×
2653
                *pkCnt = (*pkCnt + stbInfo->repeat_ts_min) % stbInfo->repeat_ts_max;
×
2654
            }
2655
        }
2656
    }
2657

2658
    // compare with current value
2659
    *pkCur = *pkCur + 1;
419,496✔
2660
    if(*pkCur >= *pkCnt) {
419,496✔
2661
        // reset zero
2662
        *pkCur = 0;
41,580✔
2663
        *pkCnt = 0;
41,580✔
2664
        return true;
41,580✔
2665
    } else {
2666
        // add one
2667
        return false;
377,916✔
2668
    }
2669
}
2670

2671
static int32_t prepareProgressDataSql(
20,786,359✔
2672
                    threadInfo *pThreadInfo,
2673
                    SChildTable *childTbl,
2674
                    char* tagData,
2675
                    uint64_t tableSeq,
2676
                    char *sampleDataBuf,
2677
                    int64_t *timestamp, uint64_t i, char *ttl,
2678
                    int32_t *pos, uint64_t *len, int32_t* pkCur, int32_t* pkCnt) {
2679
    // prepareProgressDataSql
2680
    int32_t generated = 0;
20,786,359✔
2681
    SDataBase *database = pThreadInfo->dbInfo;
20,786,359✔
2682
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
20,786,955✔
2683
    char *  pstr = pThreadInfo->buffer;
20,788,888✔
2684
    int disorderRange = stbInfo->disorderRange;
20,789,630✔
2685

2686
    if (stbInfo->partialColNum == stbInfo->cols->size) {
20,789,652✔
2687
        if (stbInfo->autoTblCreating) {
20,786,293✔
2688
            *len =
109,440✔
2689
                snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
218,040✔
2690
                        g_arguments->escape_character
109,440✔
2691
                        ? "%s `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s VALUES "
2692
                        : "%s %s.%s USING %s.%s TAGS (%s) %s VALUES ",
2693
                         STR_INSERT_INTO, database->dbName,
2694
                         childTbl->name, database->dbName,
2695
                         stbInfo->stbName,
2696
                         tagData +
2697
                         stbInfo->lenOfTags * tableSeq, ttl);
109,440✔
2698
        } else {
2699
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
41,349,698✔
2700
                    g_arguments->escape_character
20,678,857✔
2701
                           ? "%s `%s`.`%s` VALUES "
2702
                           : "%s %s.%s VALUES ",
2703
                           STR_INSERT_INTO,
2704
                           database->dbName, childTbl->name);
2705
        }
2706
    } else {
2707
        if (stbInfo->autoTblCreating) {
1,718✔
2708
            *len = snprintf(
1,568✔
2709
                    pstr, TSDB_MAX_ALLOWED_SQL_LEN,
2710
                    g_arguments->escape_character
784✔
2711
                    ? "%s `%s`.`%s` (%s) USING `%s`.`%s` TAGS (%s) %s VALUES "
2712
                    : "%s %s.%s (%s) USING %s.%s TAGS (%s) %s VALUES ",
2713
                    STR_INSERT_INTO, database->dbName,
2714
                    childTbl->name,
2715
                    stbInfo->partialColNameBuf,
2716
                    database->dbName, stbInfo->stbName,
2717
                    tagData +
2718
                    stbInfo->lenOfTags * tableSeq, ttl);
784✔
2719
        } else {
2720
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
1,760✔
2721
                    g_arguments->escape_character
934✔
2722
                    ? "%s `%s`.`%s` (%s) VALUES "
2723
                    : "%s %s.%s (%s) VALUES ",
2724
                    STR_INSERT_INTO, database->dbName,
2725
                    childTbl->name,
2726
                    stbInfo->partialColNameBuf);
2727
        }
2728
    }
2729

2730
    char *ownSampleDataBuf;
2731
    if (childTbl->useOwnSample) {
20,790,929✔
2732
        debugPrint("%s is using own sample data\n",
352✔
2733
                  childTbl->name);
2734
        ownSampleDataBuf = childTbl->sampleDataBuf;
352✔
2735
    } else {
2736
        ownSampleDataBuf = stbInfo->sampleDataBuf;
20,790,268✔
2737
    }
2738
    for (int j = 0; j < g_arguments->reqPerReq; j++) {
2,147,483,647✔
2739
        if (stbInfo->useSampleTs
2,147,483,647✔
2740
                && (!stbInfo->random_data_source)) {
3,248✔
2741
            *len +=
8,624✔
2742
                snprintf(pstr + *len,
3,248✔
2743
                         TSDB_MAX_ALLOWED_SQL_LEN - *len, "(%s)",
3,248✔
2744
                         sampleDataBuf +
2745
                         *pos * stbInfo->lenOfCols);
5,936✔
2746
        } else {
2747
            int64_t disorderTs = getDisorderTs(stbInfo, &disorderRange);
2,147,483,647✔
2748
            *len += snprintf(pstr + *len,
2,147,483,647✔
2749
                            TSDB_MAX_ALLOWED_SQL_LEN - *len,
2,147,483,647✔
2750
                            "(%" PRId64 ",%s)",
2751
                            disorderTs?disorderTs:*timestamp,
2752
                            ownSampleDataBuf +
2753
                            *pos * stbInfo->lenOfCols);
2,147,483,647✔
2754
        }
2755
        *pos += 1;
2,147,483,647✔
2756
        if (*pos >= g_arguments->prepared_rand) {
2,147,483,647✔
2757
            *pos = 0;
2,607,891✔
2758
        }
2759
        // primary key
2760
        if(!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
2,147,483,647✔
2761
            *timestamp += stbInfo->timestamp_step;
2,147,483,647✔
2762
        }
2763

2764
        generated++;
2,147,483,647✔
2765
        if (*len > (TSDB_MAX_ALLOWED_SQL_LEN
2,147,483,647✔
2766
            - stbInfo->lenOfCols)) {
2,147,483,647✔
2767
            break;
×
2768
        }
2769
        if (i + generated >= stbInfo->insertRows) {
2,147,483,647✔
2770
            break;
11,081,534✔
2771
        }
2772
    }
2773

2774
    return generated;
22,128,878✔
2775
}
2776

2777
void *syncWriteProgressive(void *sarg) {
171,354✔
2778
    threadInfo * pThreadInfo = (threadInfo *)sarg;
171,354✔
2779
    SDataBase *  database = pThreadInfo->dbInfo;
171,354✔
2780
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
172,702✔
2781

2782
    loadChildTableInfo(pThreadInfo);
172,702✔
2783

2784
    // special deal flow for TAOSC_IFACE
2785
    if (insertDataMix(pThreadInfo, database, stbInfo)) {
172,447✔
2786
        // request be dealt by this function , so return
2787
        return NULL;
1,530✔
2788
    }
2789

2790
    infoPrint(
171,214✔
2791
        "thread[%d] start progressive inserting into table from "
2792
        "%" PRIu64 " to %" PRIu64 "\n",
2793
        pThreadInfo->threadID, pThreadInfo->start_table_from,
2794
        pThreadInfo->end_table_to + 1);
2795

2796
    uint64_t  lastPrintTime = toolsGetTimestampMs();
171,228✔
2797
    uint64_t  lastTotalInsertRows = 0;
171,228✔
2798
    int64_t   startTs = toolsGetTimestampUs();
171,228✔
2799
    int64_t   endTs;
166,055✔
2800

2801
    FILE* csvFile = NULL;
171,228✔
2802
    char* tagData = NULL;
171,228✔
2803
    bool  stmt    = (stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating;
171,228✔
2804
    bool  smart   = SMART_IF_FAILED == stbInfo->continueIfFail;
171,228✔
2805
    bool  acreate = (stbInfo->iface == TAOSC_IFACE || stbInfo->iface == REST_IFACE) && stbInfo->autoTblCreating;
171,228✔
2806
    int   w       = 0;
171,228✔
2807
    if (stmt || smart || acreate) {
171,228✔
2808
        csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
9,495✔
2809
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
9,495✔
2810
    }
2811

2812
    bool oldInitStmt = stbInfo->autoTblCreating;
171,215✔
2813
    // stmt.  not auto table create call on stmt
2814
    if (stbInfo->iface == STMT_IFACE && !oldInitStmt) {
171,215✔
2815
        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
5,333✔
2816
            g_fail = true;
×
2817
            goto free_of_progressive;
×
2818
        }
2819
    }
2820
    else if (stbInfo->iface == STMT2_IFACE && !stbInfo->autoTblCreating) {
165,882✔
2821
        if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
2,474✔
2822
            g_fail = true;
×
2823
            goto free_of_progressive;
×
2824
        }
2825
    }
2826

2827
    //
2828
    // loop write each child table
2829
    //
2830
    int16_t index = pThreadInfo->start_table_from;
171,215✔
2831
    for (uint64_t tableSeq = pThreadInfo->start_table_from;
171,215✔
2832
            tableSeq <= pThreadInfo->end_table_to; tableSeq++) {
18,240,185✔
2833
        char *sampleDataBuf;
2834
        SChildTable *childTbl;
2835

2836
        if (g_arguments->bind_vgroup) {
18,084,572✔
2837
            childTbl = pThreadInfo->vg->childTblArray[tableSeq];
940✔
2838
        } else {
2839
            childTbl = stbInfo->childTblArray[tableSeq];
18,088,876✔
2840
        }
2841
        debugPrint("tableSeq=%"PRId64" childTbl->name=%s\n", tableSeq, childTbl->name);
18,092,413✔
2842

2843
        if (childTbl->useOwnSample) {
18,093,107✔
2844
            sampleDataBuf = childTbl->sampleDataBuf;
698✔
2845
        } else {
2846
            sampleDataBuf = stbInfo->sampleDataBuf;
18,087,039✔
2847
        }
2848

2849
        int64_t  timestamp = pThreadInfo->start_time;
18,093,777✔
2850
        uint64_t len = 0;
18,095,051✔
2851
        int32_t pos = 0;
18,071,519✔
2852
        int32_t pkCur = 0; // record generate same timestamp current count
18,073,344✔
2853
        int32_t pkCnt = 0; // record generate same timestamp count
18,060,138✔
2854
        int64_t delay1 = 0;
18,061,422✔
2855
        int64_t delay2 = 0;
18,062,639✔
2856
        int64_t delay3 = 0;
18,065,014✔
2857

2858
        if(stmt || smart || acreate) {
18,066,730✔
2859
            // generator
2860
            if (w == 0) {
6,912,399✔
2861
                if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
71,955✔
2862
                    g_fail = true;
×
2863
                    goto free_of_progressive;
56✔
2864
                }
2865
            }
2866
        }
2867

2868
        // old init stmt must call for each table
2869
        if (stbInfo->iface == STMT_IFACE && oldInitStmt) {
18,066,730✔
2870
            if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
6,923,136✔
2871
                g_fail = true;
×
2872
                goto free_of_progressive;
×
2873
            }
2874
        }
2875
        else if (stbInfo->iface == STMT2_IFACE && stbInfo->autoTblCreating) {
11,153,368✔
2876
            if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
192✔
2877
                g_fail = true;
×
2878
                goto free_of_progressive;
×
2879
            }
2880
        }
2881

2882
        if(stmt || smart || acreate) {
18,099,459✔
2883
            // move next
2884
            if (++w >= TAG_BATCH_COUNT) {
6,944,706✔
2885
                // reset for gen again
2886
                w = 0;
69,400✔
2887
                index += TAG_BATCH_COUNT;
69,400✔
2888
            }
2889
        }
2890

2891
        char ttl[SMALL_BUFF_LEN] = "";
18,099,459✔
2892
        if (stbInfo->ttl != 0) {
18,097,605✔
2893
            (void)snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
1,404✔
2894
        }
2895
        for (uint64_t i = 0; i < stbInfo->insertRows;) {
28,051,133✔
2896
            if (g_arguments->terminate) {
28,046,323✔
2897
                goto free_of_progressive;
×
2898
            }
2899
            int32_t generated = 0;
28,047,064✔
2900
            switch (stbInfo->iface) {
28,047,064✔
2901
                case TAOSC_IFACE:
20,785,088✔
2902
                case REST_IFACE:
2903
                    generated = prepareProgressDataSql(
20,785,088✔
2904
                            pThreadInfo,
2905
                            childTbl,
2906
                            tagData,
2907
                            w,
2908
                            sampleDataBuf,
2909
                            &timestamp, i, ttl, &pos, &len, &pkCur, &pkCnt);
2910
                    break;
20,786,653✔
2911
                case STMT_IFACE: {
6,974,436✔
2912
                    generated = prepareProgressDataStmt(
6,974,436✔
2913
                            pThreadInfo,
2914
                            childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1, &delay2, &delay3);
2915
                    break;
6,965,834✔
2916
                }
2917
                case STMT2_IFACE: {
251,396✔
2918
                    generated = stmt2BindAndSubmit(
251,396✔
2919
                            pThreadInfo,
2920
                            childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1,
2921
                            &delay3, &startTs, &endTs, w);
2922
                    break;
251,530✔
2923
                }
2924
                case SML_REST_IFACE:
35,422✔
2925
                case SML_IFACE:
2926
                    generated = prepareProgressDataSml(
35,422✔
2927
                            pThreadInfo,
2928
                            childTbl,
2929
                            tableSeq, &timestamp, i, ttl, &pkCur, &pkCnt);
2930
                    break;
35,422✔
2931
                default:
99✔
2932
                    break;
99✔
2933
            }
2934
            if (generated < 0) {
28,039,538✔
2935
                g_fail = true;
×
2936
                goto free_of_progressive;
×
2937
            }
2938
            if (!stbInfo->non_stop) {
28,039,538✔
2939
                i += generated;
28,046,659✔
2940
            }
2941

2942
            // stmt2 execInsert already execute on stmt2BindAndSubmit
2943
            if (stbInfo->iface != STMT2_IFACE) {
28,047,075✔
2944
                // no stmt2 exec
2945
                startTs = toolsGetTimestampUs();
27,792,812✔
2946
                int code = execInsert(pThreadInfo, generated, &delay3);
27,783,848✔
2947
                if (code) {
27,794,291✔
2948
                    if (NO_IF_FAILED == stbInfo->continueIfFail) {
8,408✔
2949
                        warnPrint("The super table parameter "
56✔
2950
                                "continueIfFail: %d, STOP insertion!\n",
2951
                                stbInfo->continueIfFail);
2952
                        g_fail = true;
56✔
2953
                        goto free_of_progressive;
56✔
2954
                    } else if (YES_IF_FAILED == stbInfo->continueIfFail) {
8,352✔
2955
                        infoPrint("The super table parameter "
8,352✔
2956
                                "continueIfFail: %d, "
2957
                                "will continue to insert ..\n",
2958
                                stbInfo->continueIfFail);
2959
                    } else if (smart) {
×
2960
                        warnPrint("The super table parameter "
×
2961
                                "continueIfFail: %d, will create table "
2962
                                "then insert ..\n",
2963
                                stbInfo->continueIfFail);
2964

2965
                        // generator
2966
                        if (w == 0) {
×
2967
                            if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
2968
                                g_fail = true;
×
2969
                                goto free_of_progressive;
×
2970
                            }
2971
                        }
2972

2973
                        code = smartContinueIfFail(
×
2974
                                pThreadInfo,
2975
                                childTbl, tagData, w, ttl);
2976
                        if (0 != code) {
×
2977
                            g_fail = true;
×
2978
                            goto free_of_progressive;
×
2979
                        }
2980

2981
                        // move next
2982
                        if (++w >= TAG_BATCH_COUNT) {
×
2983
                            // reset for gen again
2984
                            w = 0;
×
2985
                            index += TAG_BATCH_COUNT;
×
2986
                        }
2987

2988
                        code = execInsert(pThreadInfo, generated, &delay3);
×
2989
                        if (code) {
×
2990
                            g_fail = true;
×
2991
                            goto free_of_progressive;
×
2992
                        }
2993
                    } else {
2994
                        warnPrint("Unknown super table parameter "
×
2995
                                "continueIfFail: %d\n",
2996
                                stbInfo->continueIfFail);
2997
                        g_fail = true;
×
2998
                        goto free_of_progressive;
×
2999
                    }
3000
                }
3001
                endTs = toolsGetTimestampUs() + 1;
27,794,235✔
3002
            }
3003

3004
            if (stbInfo->insert_interval > 0) {
28,043,732✔
3005
                debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
129,610✔
3006
                          __func__, __LINE__, stbInfo->insert_interval);
3007
                perfPrint("sleep %" PRIu64 " ms\n",
129,610✔
3008
                              stbInfo->insert_interval);
3009
                toolsMsleep((int32_t)stbInfo->insert_interval);
129,610✔
3010
            }
3011

3012
            // flush
3013
            if (database->flush) {
28,037,120✔
3014
                char sql[260] = "";
39✔
3015
                (void)sprintf(sql, "flush database %s", database->dbName);
39✔
3016
                int32_t code = executeSql(pThreadInfo->conn->taos,sql);
39✔
3017
                if (code != 0) {
39✔
3018
                  perfPrint(" %s failed. error code = 0x%x\n", sql, code);
×
3019
                } else {
3020
                   perfPrint(" %s ok.\n", sql);
39✔
3021
                }
3022
            }
3023

3024
            pThreadInfo->totalInsertRows += generated;
28,044,114✔
3025

3026
            if (g_arguments->terminate) {
28,051,134✔
3027
                goto free_of_progressive;
×
3028
            }
3029
            int protocol = stbInfo->lineProtocol;
28,042,940✔
3030
            switch (stbInfo->iface) {
28,048,011✔
3031
                case REST_IFACE:
20,790,288✔
3032
                case TAOSC_IFACE:
3033
                    memset(pThreadInfo->buffer, 0, pThreadInfo->max_sql_len);
20,790,288✔
3034
                    break;
20,790,146✔
3035
                case SML_REST_IFACE:
966✔
3036
                    memset(pThreadInfo->buffer, 0,
966✔
3037
                           g_arguments->reqPerReq *
966✔
3038
                               (pThreadInfo->max_sql_len + 1));
966✔
3039
                case SML_IFACE:
35,422✔
3040
                    if (TSDB_SML_JSON_PROTOCOL == protocol) {
35,422✔
3041
                        memset(pThreadInfo->lines[0], 0,
9,045✔
3042
                           pThreadInfo->line_buf_len);
9,045✔
3043
                    } else if (SML_JSON_TAOS_FORMAT == protocol) {
26,377✔
3044
                        if (pThreadInfo->lines && pThreadInfo->lines[0]) {
2,085✔
3045
                            tmfree(pThreadInfo->lines[0]);
2,085✔
3046
                            pThreadInfo->lines[0] = NULL;
2,085✔
3047
                        }
3048
                        if (pThreadInfo->json_array) {
2,085✔
3049
                            tools_cJSON_Delete(pThreadInfo->json_array);
2,085✔
3050
                            pThreadInfo->json_array = NULL;
2,085✔
3051
                        }
3052
                        pThreadInfo->json_array = tools_cJSON_CreateArray();
2,085✔
3053
                    } else {
3054
                        for (int j = 0; j < generated; j++) {
3,992,024✔
3055
                            debugPrint("pThreadInfo->lines[%d]: %s\n",
3,969,156✔
3056
                                       j, pThreadInfo->lines[j]);
3057
                            memset(pThreadInfo->lines[j], 0,
3,971,964✔
3058
                                   pThreadInfo->max_sql_len);
3059
                        }
3060
                    }
3061
                    break;
35,422✔
3062
                case STMT_IFACE:
6,971,253✔
3063
                    break;
6,971,253✔
3064
            }
3065

3066
            int64_t delay4 = endTs - startTs;
28,037,752✔
3067
            int64_t delay = delay1 + delay2 + delay3 + delay4;
28,037,752✔
3068
            if (delay <= 0) {
28,037,752✔
3069
                debugPrint("thread[%d]: startTs: %"PRId64", endTs: %"PRId64"\n",
×
3070
                        pThreadInfo->threadID, startTs, endTs);
3071
            } else {
3072
                perfPrint("insert execution time is %.6f s\n",
28,037,752✔
3073
                              delay / 1E6);
3074

3075
                int64_t * pDelay = benchCalloc(1, sizeof(int64_t), false);
28,042,640✔
3076
                *pDelay = delay;
28,038,640✔
3077
                if (benchArrayPush(pThreadInfo->delayList, pDelay) == NULL) {
28,038,659✔
3078
                    tmfree(pDelay);
×
3079
                }
3080
                pThreadInfo->totalDelay += delay;
28,042,701✔
3081
                pThreadInfo->totalDelay1 += delay1;
28,046,344✔
3082
                pThreadInfo->totalDelay2 += delay2;
28,038,429✔
3083
                pThreadInfo->totalDelay3 += delay3;
28,046,032✔
3084
            }
3085
            delay1 = delay2 = delay3 = 0;
28,034,913✔
3086

3087
            int64_t currentPrintTime = toolsGetTimestampMs();
28,034,913✔
3088
            if (currentPrintTime - lastPrintTime > 30 * 1000) {
28,044,505✔
3089
                infoPrint(
139,162✔
3090
                        "thread[%d] has currently inserted rows: "
3091
                        "%" PRId64 ", peroid insert rate: %.3f rows/s \n",
3092
                        pThreadInfo->threadID, pThreadInfo->totalInsertRows,
3093
                        (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
3094
                lastPrintTime = currentPrintTime;
139,162✔
3095
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
139,162✔
3096
            }
3097
            if (i >= stbInfo->insertRows) {
28,044,436✔
3098
                break;
18,088,644✔
3099
            }
3100
        }  // insertRows
3101
    }      // tableSeq
3102
free_of_progressive:
169,240✔
3103
    cleanupAndPrint(pThreadInfo, "progressive");
171,215✔
3104
    if(csvFile) {
171,228✔
3105
        fclose(csvFile);
13✔
3106
    }
3107
    tmfree(tagData);
171,228✔
3108
    return NULL;
171,228✔
3109
}
3110

3111
uint64_t strToTimestamp(char * tsStr) {
668,690✔
3112
    uint64_t ts = 0;
668,690✔
3113
    // remove double quota mark
3114
    if (tsStr[0] == '\"' || tsStr[0] == '\'') {
668,690✔
3115
        tsStr += 1;
7,000✔
3116
        int32_t last = strlen(tsStr) - 1;
7,000✔
3117
        if (tsStr[last] == '\"' || tsStr[0] == '\'') {
7,000✔
3118
            tsStr[last] = 0;
7,000✔
3119
        }
3120
    }
3121

3122
    if (toolsParseTime(tsStr, (int64_t*)&ts, strlen(tsStr), TSDB_TIME_PRECISION_MILLI, 0)) {
668,690✔
3123
        // not timestamp str format, maybe int64 format
3124
        ts = (int64_t)atol(tsStr);
661,690✔
3125
    }
3126

3127
    return ts;
668,690✔
3128
}
3129

3130
static int initStmtDataValue(SSuperTable *stbInfo, SChildTable *childTbl, uint64_t *bind_ts_array) {
4,717✔
3131
    int32_t columnCount = stbInfo->cols->size;
4,717✔
3132

3133
    char *sampleDataBuf;
3134
    if (childTbl) {
4,717✔
3135
        sampleDataBuf = childTbl->sampleDataBuf;
480✔
3136
    } else {
3137
        sampleDataBuf = stbInfo->sampleDataBuf;
4,237✔
3138
    }
3139
    int64_t lenOfOneRow = stbInfo->lenOfCols;
4,717✔
3140

3141
    if (stbInfo->useSampleTs) {
4,717✔
3142
        columnCount += 1;  // for skipping first column
263✔
3143
    }
3144
    for (int i=0; i < g_arguments->prepared_rand; i++) {
46,015,953✔
3145
        int cursor = 0;
46,011,236✔
3146

3147
        for (int c = 0; c < columnCount; c++) {
644,054,174✔
3148
            char *restStr = sampleDataBuf
1,160,616,876✔
3149
                + lenOfOneRow * i + cursor;
598,042,938✔
3150
            int lengthOfRest = strlen(restStr);
598,042,938✔
3151

3152
            int index = 0;
598,042,938✔
3153
            for (index = 0; index < lengthOfRest; index++) {
2,147,483,647✔
3154
                if (restStr[index] == ',') {
2,147,483,647✔
3155
                    break;
553,258,378✔
3156
                }
3157
            }
3158

3159
            cursor += index + 1;  // skip ',' too
598,042,938✔
3160

3161
            char *tmpStr = calloc(1, index + 1);
598,042,938✔
3162
            if (NULL == tmpStr) {
598,042,938✔
3163
                errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
×
3164
                        __func__, __LINE__, index + 1);
3165
                return -1;
×
3166
            }
3167

3168
            strncpy(tmpStr, restStr, index);
598,042,938✔
3169
            if ((0 == c) && stbInfo->useSampleTs) {
598,042,938✔
3170
                // set ts to
3171
                bind_ts_array[i] = strToTimestamp(tmpStr);
668,690✔
3172
                free(tmpStr);
668,690✔
3173
                continue;
668,690✔
3174
            }
3175

3176
            Field *col = benchArrayGet(stbInfo->cols,
597,374,248✔
3177
                    (stbInfo->useSampleTs?c-1:c));
597,374,248✔
3178
            char dataType = col->type;
597,374,248✔
3179

3180
            StmtData *stmtData;
3181
            if (childTbl) {
597,374,248✔
3182
                ChildField *childCol =
3183
                    benchArrayGet(childTbl->childCols,
558,012✔
3184
                                  (stbInfo->useSampleTs?c-1:c));
558,012✔
3185
                stmtData = &childCol->stmtData;
558,012✔
3186
            } else {
3187
                stmtData = &col->stmtData;
596,816,236✔
3188
            }
3189

3190
            // set value
3191
            stmtData->is_null[i] = 0;
597,374,248✔
3192
            stmtData->lengths[i] = col->length;
597,374,248✔
3193

3194
            if (0 == strcmp(tmpStr, "NULL")) {
597,374,248✔
3195
                *(stmtData->is_null + i) = true;
422✔
3196
            } else {
3197
                switch (dataType) {
597,373,826✔
3198
                    case TSDB_DATA_TYPE_INT:
40,219,634✔
3199
                        *((int32_t*)stmtData->data + i) = atoi(tmpStr);
40,219,634✔
3200
                        break;
40,219,634✔
3201
                    case TSDB_DATA_TYPE_UINT:
1,843,080✔
3202
                        *((uint32_t*)stmtData->data + i) = (uint32_t)strtoul(tmpStr, NULL, 10);
1,843,080✔
3203
                        break;
1,843,080✔
3204
                    case TSDB_DATA_TYPE_FLOAT:
129,595,732✔
3205
                        *((float*)stmtData->data +i) = (float)atof(tmpStr);
129,595,732✔
3206
                        break;
129,595,732✔
3207
                    case TSDB_DATA_TYPE_DOUBLE:
17,681,480✔
3208
                        *((double*)stmtData->data + i) = atof(tmpStr);
17,681,480✔
3209
                        break;
17,681,480✔
3210
                    case TSDB_DATA_TYPE_TINYINT:
4,726,160✔
3211
                    case TSDB_DATA_TYPE_UTINYINT:
3212
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
4,726,160✔
3213
                        break;
4,726,160✔
3214
                    case TSDB_DATA_TYPE_SMALLINT:
319,076,160✔
3215
                    case TSDB_DATA_TYPE_USMALLINT:
3216
                        *((int16_t*)stmtData->data + i) = (int16_t)atoi(tmpStr);
319,076,160✔
3217
                        break;
319,076,160✔
3218
                    case TSDB_DATA_TYPE_BIGINT:
2,729,260✔
3219
                        *((int64_t*)stmtData->data + i) = (int64_t)strtoll(tmpStr, NULL, 10);
2,729,260✔
3220
                        break;
2,729,260✔
3221
                    case TSDB_DATA_TYPE_UBIGINT:
3,288,360✔
3222
                    case TSDB_DATA_TYPE_TIMESTAMP:
3223
                        *((uint64_t*)stmtData->data + i) = (uint64_t)strtoull(tmpStr, NULL, 10);
3,288,360✔
3224
                        break;
3,288,360✔
3225
                    case TSDB_DATA_TYPE_BOOL:
1,843,080✔
3226
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
1,843,080✔
3227
                        break;
1,843,080✔
3228
                    case TSDB_DATA_TYPE_BINARY:
76,370,880✔
3229
                    case TSDB_DATA_TYPE_NCHAR:
3230
                    case TSDB_DATA_TYPE_VARBINARY:
3231
                    case TSDB_DATA_TYPE_GEOMETRY:
3232
                        {
3233
                            size_t tmpLen = strlen(tmpStr);
76,370,880✔
3234
                            debugPrint("%s() LN%d, index: %d, "
76,370,880✔
3235
                                    "tmpStr len: %"PRIu64", col->length: %d\n",
3236
                                    __func__, __LINE__,
3237
                                    i, (uint64_t)tmpLen, col->length);
3238
                            if (tmpLen-2 > col->length) {
76,370,880✔
3239
                                errorPrint("data length %"PRIu64" "
×
3240
                                        "is larger than column length %d\n",
3241
                                        (uint64_t)tmpLen, col->length);
3242
                            }
3243
                            if (tmpLen > 2) {
76,370,880✔
3244
                                strncpy((char *)stmtData->data
74,259,801✔
3245
                                            + i * col->length,
74,259,801✔
3246
                                        tmpStr+1,
74,259,801✔
3247
                                        min(col->length, tmpLen - 2));
74,259,801✔
3248
                            } else {
3249
                                strncpy((char *)stmtData->data
3,352,064✔
3250
                                            + i*col->length,
2,111,079✔
3251
                                        "", 1);
3252
                            }
3253
                        }
3254
                        break;
76,370,880✔
3255
                    case TSDB_DATA_TYPE_DECIMAL:
×
3256
                    case TSDB_DATA_TYPE_DECIMAL64:
3257
                        errorPrint("Not implemented data type in func initStmtDataValue: %s\n",
×
3258
                                convertDatatypeToString(dataType));
3259
                        exit(EXIT_FAILURE);
×
3260
                    case TSDB_DATA_TYPE_BLOB: {
×
3261
                        size_t tmpLen = strlen(tmpStr);
×
3262
                        debugPrint(
×
3263
                            "%s() LN%d, index: %d, "
3264
                            "tmpStr len: %" PRIu64 ", col->length: %d\n",
3265
                            __func__, __LINE__, i, (uint64_t)tmpLen, col->length);
3266
                        if (tmpLen - 2 > col->length) {
×
3267
                                errorPrint("data length %" PRIu64
×
3268
                                           " "
3269
                                           "is larger than column length %d\n",
3270
                                           (uint64_t)tmpLen, col->length);
3271
                        }
3272
                        if (tmpLen > 2) {
×
3273
                                strncpy((char *)stmtData->data + i * col->length, tmpStr + 1,
×
3274
                                        min(col->length, tmpLen - 2));
×
3275
                        } else {
3276
                                strncpy((char *)stmtData->data + i * col->length, "", 1);
×
3277
                        }
3278
                        break;
×
3279
                    }
3280

3281
                    default:
×
3282
                        break;
×
3283
                }
3284
            }
3285
            free(tmpStr);
597,374,248✔
3286
        }
3287
    }
3288
    return 0;
4,717✔
3289
}
3290

3291
static void initStmtData(char dataType, void **data, uint32_t length) {
56,359✔
3292
    char *tmpP = NULL;
56,359✔
3293

3294
    switch (dataType) {
56,359✔
3295
        case TSDB_DATA_TYPE_INT:
3,608✔
3296
        case TSDB_DATA_TYPE_UINT:
3297
            tmpP = calloc(1, sizeof(int) * g_arguments->prepared_rand);
3,608✔
3298
            assert(tmpP);
3,608✔
3299
            tmfree(*data);
3,608✔
3300
            *data = (void*)tmpP;
3,608✔
3301
            break;
3,608✔
3302

3303
        case TSDB_DATA_TYPE_TINYINT:
348✔
3304
        case TSDB_DATA_TYPE_UTINYINT:
3305
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
348✔
3306
            assert(tmpP);
348✔
3307
            tmfree(*data);
348✔
3308
            *data = (void*)tmpP;
348✔
3309
            break;
348✔
3310

3311
        case TSDB_DATA_TYPE_SMALLINT:
31,835✔
3312
        case TSDB_DATA_TYPE_USMALLINT:
3313
            tmpP = calloc(1, sizeof(int16_t) * g_arguments->prepared_rand);
31,835✔
3314
            assert(tmpP);
31,835✔
3315
            tmfree(*data);
31,835✔
3316
            *data = (void*)tmpP;
31,835✔
3317
            break;
31,835✔
3318

3319
        case TSDB_DATA_TYPE_BIGINT:
1,002✔
3320
        case TSDB_DATA_TYPE_UBIGINT:
3321
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
1,002✔
3322
            assert(tmpP);
1,002✔
3323
            tmfree(*data);
1,002✔
3324
            *data = (void*)tmpP;
1,002✔
3325
            break;
1,002✔
3326

3327
        case TSDB_DATA_TYPE_BOOL:
148✔
3328
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
148✔
3329
            assert(tmpP);
148✔
3330
            tmfree(*data);
148✔
3331
            *data = (void*)tmpP;
148✔
3332
            break;
148✔
3333

3334
        case TSDB_DATA_TYPE_FLOAT:
10,674✔
3335
            tmpP = calloc(1, sizeof(float) * g_arguments->prepared_rand);
10,674✔
3336
            assert(tmpP);
10,674✔
3337
            tmfree(*data);
10,674✔
3338
            *data = (void*)tmpP;
10,674✔
3339
            break;
10,674✔
3340

3341
        case TSDB_DATA_TYPE_DOUBLE:
1,564✔
3342
            tmpP = calloc(1, sizeof(double) * g_arguments->prepared_rand);
1,564✔
3343
            assert(tmpP);
1,564✔
3344
            tmfree(*data);
1,564✔
3345
            *data = (void*)tmpP;
1,564✔
3346
            break;
1,564✔
3347

3348
        case TSDB_DATA_TYPE_BINARY:
6,961✔
3349
        case TSDB_DATA_TYPE_NCHAR:
3350
        case TSDB_DATA_TYPE_VARBINARY:
3351
        case TSDB_DATA_TYPE_GEOMETRY:
3352
            tmpP = calloc(1, g_arguments->prepared_rand * length);
6,961✔
3353
            assert(tmpP);
6,961✔
3354
            tmfree(*data);
6,961✔
3355
            *data = (void*)tmpP;
6,961✔
3356
            break;
6,961✔
3357

3358
        case TSDB_DATA_TYPE_TIMESTAMP:
219✔
3359
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
219✔
3360
            assert(tmpP);
219✔
3361
            tmfree(*data);
219✔
3362
            *data = (void*)tmpP;
219✔
3363
            break;
219✔
3364

3365
        case TSDB_DATA_TYPE_DECIMAL:
×
3366
        case TSDB_DATA_TYPE_DECIMAL64:
3367
            errorPrint("Not implemented data type in func initStmtData: %s\n",
×
3368
                       convertDatatypeToString(dataType));
3369
            exit(EXIT_FAILURE);
×
3370

3371
        case TSDB_DATA_TYPE_BLOB: {
×
3372
            tmpP = calloc(1, g_arguments->prepared_rand * length);
×
3373
            assert(tmpP);
×
3374
            tmfree(*data);
×
3375
            *data = (void *)tmpP;
×
3376
            break;
×
3377
        }
3378
        default:
×
3379

3380
            errorPrint("Unknown data type on initStmtData: %s\n",
×
3381
                       convertDatatypeToString(dataType));
3382
            exit(EXIT_FAILURE);
×
3383
    }
3384
}
56,359✔
3385

3386
static int parseBufferToStmtBatchChildTbl(SSuperTable *stbInfo,
480✔
3387
                                          SChildTable* childTbl, uint64_t *bind_ts_array) {
3388
    int32_t columnCount = stbInfo->cols->size;
480✔
3389

3390
    for (int c = 0; c < columnCount; c++) {
1,752✔
3391
        Field *col = benchArrayGet(stbInfo->cols, c);
1,272✔
3392
        ChildField *childCol = benchArrayGet(childTbl->childCols, c);
1,272✔
3393
        char dataType = col->type;
1,272✔
3394

3395
        // malloc memory
3396
        tmfree(childCol->stmtData.is_null);
1,272✔
3397
        tmfree(childCol->stmtData.lengths);
1,272✔
3398
        childCol->stmtData.is_null = benchCalloc(sizeof(char),     g_arguments->prepared_rand, true);
1,272✔
3399
        childCol->stmtData.lengths = benchCalloc(sizeof(int32_t),  g_arguments->prepared_rand, true);
1,272✔
3400

3401
        initStmtData(dataType, &(childCol->stmtData.data), col->length);
1,272✔
3402
    }
3403

3404
    return initStmtDataValue(stbInfo, childTbl, bind_ts_array);
480✔
3405
}
3406

3407
static int parseBufferToStmtBatch(SSuperTable* stbInfo, uint64_t *bind_ts_array) {
4,237✔
3408
    int32_t columnCount = stbInfo->cols->size;
4,237✔
3409

3410
    for (int c = 0; c < columnCount; c++) {
59,324✔
3411
        Field *col = benchArrayGet(stbInfo->cols, c);
55,087✔
3412

3413
        //remalloc element count is g_arguments->prepared_rand buffer
3414
        tmfree(col->stmtData.is_null);
55,087✔
3415
        col->stmtData.is_null = benchCalloc(sizeof(char), g_arguments->prepared_rand, false);
55,087✔
3416
        tmfree(col->stmtData.lengths);
55,087✔
3417
        col->stmtData.lengths = benchCalloc(sizeof(int32_t), g_arguments->prepared_rand, false);
55,087✔
3418

3419
        initStmtData(col->type, &(col->stmtData.data), col->length);
55,087✔
3420
    }
3421

3422
    return initStmtDataValue(stbInfo, NULL, bind_ts_array);
4,237✔
3423
}
3424

3425
static int64_t fillChildTblNameByCount(SSuperTable *stbInfo) {
39,035✔
3426
    if (stbInfo->useTagTableName) {
39,035✔
3427
        return stbInfo->childTblCount;
89✔
3428
    }
3429

3430
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
42,231,553✔
3431
        char childName[TSDB_TABLE_NAME_LEN]={0};
42,192,607✔
3432
        (void)snprintf(childName,
42,192,607✔
3433
                 TSDB_TABLE_NAME_LEN,
3434
                 "%s%" PRIu64,
3435
                 stbInfo->childTblPrefix, i);
3436
        stbInfo->childTblArray[i]->name = strdup(childName);
42,192,607✔
3437
        debugPrint("%s(): %s\n", __func__,
42,192,607✔
3438
                  stbInfo->childTblArray[i]->name);
3439
    }
3440

3441
    return stbInfo->childTblCount;
38,946✔
3442
}
3443

3444
static int64_t fillChildTblNameByFromTo(SDataBase *database,
39✔
3445
        SSuperTable* stbInfo) {
3446
    for (int64_t i = stbInfo->childTblFrom; i <= stbInfo->childTblTo; i++) {
169✔
3447
        char childName[TSDB_TABLE_NAME_LEN]={0};
130✔
3448
        (void)snprintf(childName,
130✔
3449
                TSDB_TABLE_NAME_LEN,
3450
                "%s%" PRIu64,
3451
                stbInfo->childTblPrefix, i);
3452
        stbInfo->childTblArray[i]->name = strdup(childName);
130✔
3453
    }
3454

3455
    return (stbInfo->childTblTo-stbInfo->childTblFrom);
39✔
3456
}
3457

3458
static int64_t fillChildTblNameByLimitOffset(SDataBase *database,
290✔
3459
        SSuperTable* stbInfo) {
3460
    SBenchConn* conn = initBenchConn(database->dbName);
290✔
3461
    if (NULL == conn) {
290✔
3462
        return -1;
×
3463
    }
3464
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
290✔
3465
    if (g_arguments->taosc_version == 3) {
290✔
3466
        (void)snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
290✔
3467
                 "SELECT DISTINCT(TBNAME) FROM %s.`%s` LIMIT %" PRId64
3468
                 " OFFSET %" PRIu64,
3469
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
3470
                 stbInfo->childTblOffset);
3471
    } else {
3472
        (void)snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
3473
                 "SELECT TBNAME FROM %s.`%s` LIMIT %" PRId64
3474
                 " OFFSET %" PRIu64,
3475
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
3476
                 stbInfo->childTblOffset);
3477
    }
3478
    debugPrint("cmd: %s\n", cmd);
290✔
3479
    TAOS_RES *res = taos_query(conn->taos, cmd);
290✔
3480
    int32_t   code = taos_errno(res);
290✔
3481
    int64_t   count = 0;
290✔
3482
    if (code) {
290✔
3483
        printErrCmdCodeStr(cmd, code, res);
42✔
3484
        closeBenchConn(conn);
42✔
3485
        return -1;
42✔
3486
    }
3487
    TAOS_ROW row = NULL;
248✔
3488
    while ((row = taos_fetch_row(res)) != NULL) {
965✔
3489
        int *lengths = taos_fetch_lengths(res);
717✔
3490
        char * childName = benchCalloc(1, lengths[0] + 1, true);
717✔
3491
        strncpy(childName, row[0], lengths[0]);
717✔
3492
        childName[lengths[0]] = '\0';
717✔
3493
        stbInfo->childTblArray[count]->name = childName;
717✔
3494
        debugPrint("stbInfo->childTblArray[%" PRId64 "]->name: %s\n",
717✔
3495
                   count, stbInfo->childTblArray[count]->name);
3496
        count++;
717✔
3497
    }
3498
    taos_free_result(res);
248✔
3499
    closeBenchConn(conn);
248✔
3500
    return count;
248✔
3501
}
3502

3503
static void preProcessArgument(SSuperTable *stbInfo) {
38,517✔
3504
    if (stbInfo->interlaceRows > g_arguments->reqPerReq) {
38,517✔
3505
        infoPrint(
391✔
3506
            "interlaceRows(%d) is larger than record per request(%u), which "
3507
            "will be set to %u\n",
3508
            stbInfo->interlaceRows, g_arguments->reqPerReq,
3509
            g_arguments->reqPerReq);
3510
        stbInfo->interlaceRows = g_arguments->reqPerReq;
391✔
3511
    }
3512

3513
    if (stbInfo->interlaceRows > stbInfo->insertRows) {
38,517✔
3514
        infoPrint(
14✔
3515
                "interlaceRows larger than insertRows %d > %" PRId64 "\n",
3516
                stbInfo->interlaceRows, stbInfo->insertRows);
3517
        infoPrint("%s", "interlaceRows will be set to 0\n");
14✔
3518
        stbInfo->interlaceRows = 0;
14✔
3519
    }
3520

3521
    if (stbInfo->interlaceRows == 0
38,517✔
3522
            && g_arguments->reqPerReq > stbInfo->insertRows) {
36,300✔
3523
        infoPrint("record per request (%u) is larger than "
17,462✔
3524
                "insert rows (%"PRIu64")"
3525
                " in progressive mode, which will be set to %"PRIu64"\n",
3526
                g_arguments->reqPerReq, stbInfo->insertRows,
3527
                stbInfo->insertRows);
3528
        g_arguments->reqPerReq = stbInfo->insertRows;
17,462✔
3529
    }
3530

3531
    if (stbInfo->interlaceRows > 0 && stbInfo->iface == STMT_IFACE
38,517✔
3532
            && stbInfo->autoTblCreating) {
530✔
3533
        errorPrint("%s","stmt not support autocreate table with interlace row , quit programe!\n");
×
3534
        exit(-1);
×
3535
    }
3536
}
38,517✔
3537

3538
static int printTotalDelay(SDataBase *database,
38,517✔
3539
                           int64_t totalDelay,
3540
                           int64_t totalDelay1,
3541
                           int64_t totalDelay2,
3542
                           int64_t totalDelay3,
3543
                           BArray *total_delay_list,
3544
                            int threads,
3545
                            int64_t totalInsertRows,
3546
                            int64_t spend) {
3547
    // zero check
3548
    if (total_delay_list->size == 0 || spend == 0 || threads == 0) {
38,517✔
3549
        return -1;
810✔
3550
    }
3551

3552
    char subDelay[128] = "";
37,707✔
3553
    if(totalDelay1 + totalDelay2 + totalDelay3 > 0) {
37,707✔
3554
        (void)sprintf(subDelay, " stmt delay1=%.2fs delay2=%.2fs delay3=%.2fs",
3,441✔
3555
                totalDelay1/threads/1E6,
3,441✔
3556
                totalDelay2/threads/1E6,
3,441✔
3557
                totalDelay3/threads/1E6);
3,441✔
3558
    }
3559

3560
    double time_cost = spend / 1E6;
37,707✔
3561
    double real_time_cost = totalDelay/threads/1E6;
37,707✔
3562
    double records_per_second = (double)(totalInsertRows / (spend/1E6));
37,707✔
3563
    double real_records_per_second = (double)(totalInsertRows / (totalDelay/threads/1E6));
37,707✔
3564

3565
    succPrint("Spent %.6f (real %.6f) seconds to insert rows: %" PRIu64
37,707✔
3566
              " with %d thread(s) into %s %.2f (real %.2f) records/second%s\n",
3567
              time_cost, real_time_cost, totalInsertRows, threads,
3568
              database->dbName, records_per_second,
3569
              real_records_per_second, subDelay);
3570

3571
    if (!total_delay_list->size) {
37,707✔
3572
        return -1;
×
3573
    }
3574

3575
    double minDelay = *(int64_t *)(benchArrayGet(total_delay_list, 0))/1E3;
37,707✔
3576
    double avgDelay = (double)totalDelay/total_delay_list->size/1E3;
37,707✔
3577
    double p90 = *(int64_t *)(benchArrayGet(total_delay_list,
39,411✔
3578
                                         (int32_t)(total_delay_list->size
37,707✔
3579
                                         * 0.9)))/1E3;
37,707✔
3580
    double p95 = *(int64_t *)(benchArrayGet(total_delay_list,
39,411✔
3581
                                         (int32_t)(total_delay_list->size
37,707✔
3582
                                         * 0.95)))/1E3;
37,707✔
3583
    double p99 = *(int64_t *)(benchArrayGet(total_delay_list,
39,411✔
3584
                                         (int32_t)(total_delay_list->size
37,707✔
3585
                                         * 0.99)))/1E3;
37,707✔
3586
    double maxDelay = *(int64_t *)(benchArrayGet(total_delay_list,
39,411✔
3587
                                         (int32_t)(total_delay_list->size
37,707✔
3588
                                         - 1)))/1E3;
37,707✔
3589

3590
    succPrint("insert delay, "
37,707✔
3591
              "min: %.4fms, "
3592
              "avg: %.4fms, "
3593
              "p90: %.4fms, "
3594
              "p95: %.4fms, "
3595
              "p99: %.4fms, "
3596
              "max: %.4fms\n",
3597
            minDelay, avgDelay, p90, p95, p99, maxDelay);
3598

3599
    if (g_arguments->output_json_file) {
37,707✔
3600
        tools_cJSON *root = tools_cJSON_CreateObject();
60✔
3601
        if (root) {
60✔
3602
            tools_cJSON_AddStringToObject(root, "db_name", database->dbName);
60✔
3603
            tools_cJSON_AddNumberToObject(root, "inserted_rows", totalInsertRows);
60✔
3604
            tools_cJSON_AddNumberToObject(root, "threads", threads);
60✔
3605
            tools_cJSON_AddNumberToObject(root, "time_cost", time_cost);
60✔
3606
            tools_cJSON_AddNumberToObject(root, "real_time_cost", real_time_cost);
60✔
3607
            tools_cJSON_AddNumberToObject(root, "records_per_second",  records_per_second);
60✔
3608
            tools_cJSON_AddNumberToObject(root, "real_records_per_second", real_records_per_second);
60✔
3609

3610
            tools_cJSON_AddNumberToObject(root, "avg", avgDelay);
60✔
3611
            tools_cJSON_AddNumberToObject(root, "min", minDelay);
60✔
3612
            tools_cJSON_AddNumberToObject(root, "max", maxDelay);
60✔
3613
            tools_cJSON_AddNumberToObject(root, "p90", p90);
60✔
3614
            tools_cJSON_AddNumberToObject(root, "p95", p95);
60✔
3615
            tools_cJSON_AddNumberToObject(root, "p99", p99);
60✔
3616

3617
            char *jsonStr = tools_cJSON_PrintUnformatted(root);
60✔
3618
            if (jsonStr) {
60✔
3619
                FILE *fp = fopen(g_arguments->output_json_file, "w");
60✔
3620
                if (fp) {
60✔
3621
                    fprintf(fp, "%s\n", jsonStr);
60✔
3622
                    fclose(fp);
60✔
3623
                } else {
3624
                    errorPrint("Failed to open output JSON file, file name %s\n",
×
3625
                            g_arguments->output_json_file);
3626
                }
3627
                free(jsonStr);
60✔
3628
            }
3629
            tools_cJSON_Delete(root);
60✔
3630
        }
3631
    }
3632
    return 0;
37,707✔
3633
}
3634

3635
static int64_t fillChildTblNameImp(SDataBase *database, SSuperTable *stbInfo) {
1,035✔
3636
    int64_t ntables;
3637
    if (stbInfo->childTblLimit) {
1,035✔
3638
        ntables = fillChildTblNameByLimitOffset(database, stbInfo);
290✔
3639
    } else if (stbInfo->childTblFrom || stbInfo->childTblTo) {
745✔
3640
        ntables = fillChildTblNameByFromTo(database, stbInfo);
39✔
3641
    } else {
3642
        ntables = fillChildTblNameByCount(stbInfo);
706✔
3643
    }
3644
    return ntables;
1,035✔
3645
}
3646

3647
static int64_t fillChildTblName(SDataBase *database, SSuperTable *stbInfo) {
40,819✔
3648
    int64_t ntables = stbInfo->childTblCount;
40,819✔
3649
    stbInfo->childTblArray = benchCalloc(stbInfo->childTblCount,
40,819✔
3650
            sizeof(SChildTable*), true);
3651
    for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
42,237,120✔
3652
        stbInfo->childTblArray[child] =
84,207,509✔
3653
            benchCalloc(1, sizeof(SChildTable), true);
42,196,301✔
3654
    }
3655

3656
    if (stbInfo->childTblCount == 1 && stbInfo->tags->size == 0) {
42,274✔
3657
        // Normal table
3658
        char childName[TSDB_TABLE_NAME_LEN]={0};
1,455✔
3659
        (void)snprintf(childName, TSDB_TABLE_NAME_LEN,
1,455✔
3660
                    "%s", stbInfo->stbName);
3661
        stbInfo->childTblArray[0]->name = strdup(childName);
1,455✔
3662
    } else if ((stbInfo->iface != SML_IFACE
39,364✔
3663
        && stbInfo->iface != SML_REST_IFACE)
36,137✔
3664
            && stbInfo->childTblExists) {
35,941✔
3665
        ntables = fillChildTblNameImp(database, stbInfo);
1,035✔
3666
    } else {
3667
        ntables = fillChildTblNameByCount(stbInfo);
38,329✔
3668
    }
3669

3670
    return ntables;
40,819✔
3671
}
3672

3673
// last ts fill to filllBackTime
3674
static bool fillSTableLastTs(SDataBase *database, SSuperTable *stbInfo) {
×
3675
    SBenchConn* conn = initBenchConn(database->dbName);
×
3676
    if (NULL == conn) {
×
3677
        return false;
×
3678
    }
3679
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
3680
    (void)snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "select last(ts) from %s.`%s`", database->dbName, stbInfo->stbName);
×
3681

3682
    infoPrint("fillBackTime: %s\n", cmd);
×
3683
    TAOS_RES *res = taos_query(conn->taos, cmd);
×
3684
    int32_t   code = taos_errno(res);
×
3685
    if (code) {
×
3686
        printErrCmdCodeStr(cmd, code, res);
×
3687
        closeBenchConn(conn);
×
3688
        return false;
×
3689
    }
3690

3691
    TAOS_ROW row = taos_fetch_row(res);
×
3692
    if(row == NULL) {
×
3693
        taos_free_result(res);
×
3694
        closeBenchConn(conn);
×
3695
        return false;
×
3696
    }
3697

3698
    char lastTs[128];
×
3699
    memset(lastTs, 0, sizeof(lastTs));
×
3700

3701
    stbInfo->startFillbackTime = *(int64_t*)row[0];
×
3702
    toolsFormatTimestamp(lastTs, stbInfo->startFillbackTime, database->precision);
×
3703
    infoPrint("fillBackTime: get ok %s.%s last ts=%s \n", database->dbName, stbInfo->stbName, lastTs);
×
3704

3705
    taos_free_result(res);
×
3706
    closeBenchConn(conn);
×
3707

3708
    return true;
×
3709
}
3710

3711
// calcNow expression fill to timestamp_start
3712
static bool calcExprFromServer(SDataBase *database, SSuperTable *stbInfo) {
1,668✔
3713
    SBenchConn* conn = initBenchConn(database->dbName);
1,668✔
3714
    if (NULL == conn) {
1,668✔
3715
        return false;
×
3716
    }
3717
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
1,668✔
3718
    (void)snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "select %s", stbInfo->calcNow);
1,668✔
3719

3720
    infoPrint("calcExprFromServer: %s\n", cmd);
1,668✔
3721
    TAOS_RES *res = taos_query(conn->taos, cmd);
1,668✔
3722
    int32_t   code = taos_errno(res);
1,668✔
3723
    if (code) {
1,668✔
3724
        printErrCmdCodeStr(cmd, code, res);
×
3725
        closeBenchConn(conn);
×
3726
        return false;
×
3727
    }
3728

3729
    TAOS_ROW row = taos_fetch_row(res);
1,668✔
3730
    if(row == NULL) {
1,668✔
3731
        taos_free_result(res);
×
3732
        closeBenchConn(conn);
×
3733
        return false;
×
3734
    }
3735

3736
    char ts[128];
1,668✔
3737
    memset(ts, 0, sizeof(ts));
1,668✔
3738

3739
    stbInfo->startTimestamp = *(int64_t*)row[0];
1,668✔
3740
    toolsFormatTimestamp(ts, stbInfo->startTimestamp, database->precision);
1,668✔
3741
    infoPrint("calcExprFromServer: get ok.  %s = %s \n", stbInfo->calcNow, ts);
1,668✔
3742

3743
    taos_free_result(res);
1,668✔
3744
    closeBenchConn(conn);
1,668✔
3745

3746
    return true;
1,668✔
3747
}
3748

3749
int64_t obtainTableCount(SDataBase* database, SSuperTable* stbInfo) {
38,517✔
3750
    // ntable calc
3751
    int64_t ntables;
3752
    if (stbInfo->childTblTo > 0) {
38,517✔
3753
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
52✔
3754
    } else if (stbInfo->childTblLimit > 0 && stbInfo->childTblExists) {
38,465✔
3755
        ntables = stbInfo->childTblLimit;
262✔
3756
    } else {
3757
        ntables = stbInfo->childTblCount;
38,203✔
3758
    }
3759

3760
    return ntables;
38,517✔
3761
}
3762

3763
// assign table to thread with vgroups, return assign thread count
3764
int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) {
347✔
3765
    int32_t threads = 0;
347✔
3766

3767
    // calc table count per vgroup
3768
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
11,084✔
3769
        int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
10,737✔
3770
        if (vgIdx == -1) {
10,737✔
3771
            continue;
×
3772
        }
3773
        SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
10,737✔
3774
        vg->tbCountPerVgId ++;
10,737✔
3775
    }
3776

3777
    // malloc vg->childTblArray memory with table count
3778
    for (int v = 0; v < database->vgroups; v++) {
1,609✔
3779
        SVGroup *vg = benchArrayGet(database->vgArray, v);
1,262✔
3780
        infoPrint("Local hash calc %"PRId64" tables on %s's vgroup %d (id: %d)\n",
1,262✔
3781
                    vg->tbCountPerVgId, database->dbName, v, vg->vgId);
3782
        if (vg->tbCountPerVgId) {
1,262✔
3783
            threads++;
1,184✔
3784
        } else {
3785
            continue;
78✔
3786
        }
3787
        vg->childTblArray = benchCalloc(vg->tbCountPerVgId, sizeof(SChildTable*), true);
1,184✔
3788
        vg->tbOffset      = 0;
1,184✔
3789
    }
3790

3791
    // set vg->childTblArray data
3792
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
11,084✔
3793
        int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
10,737✔
3794
        if (vgIdx == -1) {
10,737✔
3795
            continue;
×
3796
        }
3797
        SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
10,737✔
3798
        debugPrint("calc table hash to vgroup %s.%s vgIdx=%d\n",
10,737✔
3799
                    database->dbName,
3800
                    stbInfo->childTblArray[i]->name, vgIdx);
3801
        vg->childTblArray[vg->tbOffset] = stbInfo->childTblArray[i];
10,737✔
3802
        vg->tbOffset++;
10,737✔
3803
    }
3804
    return threads;
347✔
3805
}
3806

3807
// init stmt
3808
TAOS_STMT* initStmt(TAOS* taos, bool single) {
14,013✔
3809
    if (!single) {
14,013✔
3810
        infoPrint("initStmt call taos_stmt_init single=%d\n", single);
×
3811
        return taos_stmt_init(taos);
×
3812
    }
3813

3814
    TAOS_STMT_OPTIONS op;
13,501✔
3815
    memset(&op, 0, sizeof(op));
14,013✔
3816
    op.singleStbInsert      = single;
14,013✔
3817
    op.singleTableBindOnce  = single;
14,013✔
3818
    infoPrint("initStmt call taos_stmt_init_with_options single=%d\n", single);
14,013✔
3819
    return taos_stmt_init_with_options(taos, &op);
14,013✔
3820
}
3821

3822
// init stmt2
3823
TAOS_STMT2* initStmt2(TAOS* taos, bool single) {
4,349✔
3824
    TAOS_STMT2_OPTION op2;
3,936✔
3825
    memset(&op2, 0, sizeof(op2));
4,349✔
3826
    op2.singleStbInsert      = single;
4,349✔
3827
    op2.singleTableBindOnce  = single;
4,349✔
3828

3829
    TAOS_STMT2* stmt2 = taos_stmt2_init(taos, &op2);
4,349✔
3830
    if (stmt2)
4,349✔
3831
        succPrint("succ  taos_stmt2_init single=%d\n", single);
4,349✔
3832
    else
3833
        errorPrint("failed taos_stmt2_init single=%d\n", single);
×
3834
    return stmt2;
4,349✔
3835
}
3836

3837
// init insert thread
3838
void initTsArray(uint64_t *bind_ts_array, SSuperTable* stbInfo) {
4,237✔
3839
    parseBufferToStmtBatch(stbInfo, bind_ts_array);
4,237✔
3840
    for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
14,973,654✔
3841
        SChildTable *childTbl = stbInfo->childTblArray[child];
14,969,417✔
3842
        if (childTbl->useOwnSample) {
14,969,417✔
3843
            parseBufferToStmtBatchChildTbl(stbInfo, childTbl, bind_ts_array);
480✔
3844
        }
3845
    }
3846

3847
}
4,237✔
3848

3849
void *genInsertTheadInfo(void* arg) {
180,950✔
3850

3851
    if (g_arguments->terminate || g_fail) {
180,950✔
UNCOV
3852
        return NULL;
×
3853
    }
3854

3855
    threadInfo* pThreadInfo = (threadInfo*)arg;
180,975✔
3856
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
180,975✔
3857
    pThreadInfo->delayList = benchArrayInit(1, sizeof(int64_t));
180,975✔
3858
    switch (stbInfo->iface) {
180,975✔
3859
        // rest
3860
        case REST_IFACE: {
56✔
3861
            if (stbInfo->interlaceRows > 0) {
56✔
3862
                pThreadInfo->buffer = new_ds(0);
×
3863
            } else {
3864
                pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
56✔
3865
            }
3866
            int sockfd = createSockFd();
56✔
3867
            if (sockfd < 0) {
56✔
3868
                g_fail = true;
×
3869
                goto END;
×
3870
            }
3871
            pThreadInfo->sockfd = sockfd;
56✔
3872
            break;
56✔
3873
        }
3874
        // stmt & stmt2 init
3875
        case STMT_IFACE:
18,362✔
3876
        case STMT2_IFACE: {
3877
            pThreadInfo->conn = initBenchConn(pThreadInfo->dbInfo->dbName);
18,362✔
3878
            if (NULL == pThreadInfo->conn) {
18,362✔
3879
                goto END;
×
3880
            }
3881
            // single always true for benchmark
3882
            bool single = true;
18,362✔
3883
            if (stbInfo->iface == STMT2_IFACE) {
18,362✔
3884
                // stmt2 init
3885
                if (pThreadInfo->conn->stmt2)
4,349✔
3886
                    taos_stmt2_close(pThreadInfo->conn->stmt2);
×
3887
                pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
4,349✔
3888
                if (NULL == pThreadInfo->conn->stmt2) {
4,349✔
3889
                    errorPrint("taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
×
3890
                    g_fail = true;
×
3891
                    goto END;
×
3892
                }
3893
            } else {
3894
                // stmt init
3895
                if (pThreadInfo->conn->stmt)
14,013✔
3896
                    taos_stmt_close(pThreadInfo->conn->stmt);
×
3897
                pThreadInfo->conn->stmt = initStmt(pThreadInfo->conn->taos, single);
14,013✔
3898
                if (NULL == pThreadInfo->conn->stmt) {
14,013✔
3899
                    errorPrint("taos_stmt_init() failed, reason: %s\n", taos_errstr(NULL));
×
3900
                    g_fail = true;
×
3901
                    goto END;
×
3902
                }
3903
            }
3904

3905
            // select db
3906
            if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
18,362✔
3907
                errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
3908
                g_fail = true;
×
3909
                goto END;
×
3910
            }
3911

3912
            // malloc bind
3913
            int32_t unit = stbInfo->iface == STMT2_IFACE ? sizeof(TAOS_STMT2_BIND) : sizeof(TAOS_MULTI_BIND);
18,362✔
3914
            pThreadInfo->bind_ts       = benchCalloc(1, sizeof(int64_t), true);
18,362✔
3915

3916
            pThreadInfo->bindParams    = benchCalloc(1, unit * (stbInfo->cols->size + 1), true);
18,362✔
3917
            // have ts columns, so size + 1
3918
            pThreadInfo->lengths       = benchCalloc(stbInfo->cols->size + 1, sizeof(int32_t*), true);
18,362✔
3919
            for(int32_t c = 0; c <= stbInfo->cols->size; c++) {
247,787✔
3920
                pThreadInfo->lengths[c] = benchCalloc(g_arguments->reqPerReq, sizeof(int32_t), true);
229,560✔
3921
            }
3922
            // tags data
3923
            pThreadInfo->tagsStmt = copyBArray(stbInfo->tags);
18,362✔
3924
            for(int32_t n = 0; n < pThreadInfo->tagsStmt->size; n ++ ) {
81,745✔
3925
                Field *field = benchArrayGet(pThreadInfo->tagsStmt, n);
63,383✔
3926
                memset(&field->stmtData, 0, sizeof(StmtData));
63,383✔
3927
            }
3928

3929
            break;
18,362✔
3930
        }
3931
        // sml rest
3932
        case SML_REST_IFACE: {
518✔
3933
            int sockfd = createSockFd();
518✔
3934
            if (sockfd < 0) {
518✔
3935
                g_fail = true;
×
3936
                goto END;
×
3937
            }
3938
            pThreadInfo->sockfd = sockfd;
518✔
3939
        }
3940
        // sml
3941
        case SML_IFACE: {
12,005✔
3942
            if (stbInfo->iface == SML_IFACE) {
12,005✔
3943
                pThreadInfo->conn = initBenchConn(pThreadInfo->dbInfo->dbName);
11,473✔
3944
                if (pThreadInfo->conn == NULL) {
11,501✔
3945
                    errorPrint("%s() init connection failed\n", __func__);
×
3946
                    g_fail = true;
×
3947
                    goto END;
×
3948
                }
3949
                if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
11,501✔
3950
                    errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
3951
                    g_fail = true;
×
3952
                    goto END;
×
3953
                }
3954
            }
3955
            pThreadInfo->max_sql_len = stbInfo->lenOfCols + stbInfo->lenOfTags;
12,033✔
3956
            if (stbInfo->iface == SML_REST_IFACE) {
12,033✔
3957
                pThreadInfo->buffer = benchCalloc(1, g_arguments->reqPerReq * (1 + pThreadInfo->max_sql_len), true);
518✔
3958
            }
3959
            int protocol = stbInfo->lineProtocol;
12,033✔
3960
            if (TSDB_SML_JSON_PROTOCOL != protocol && SML_JSON_TAOS_FORMAT != protocol) {
19,776✔
3961
                pThreadInfo->sml_tags = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true);
7,729✔
3962
                for (int t = 0; t < pThreadInfo->ntables; t++) {
41,022✔
3963
                    pThreadInfo->sml_tags[t] = benchCalloc(1, stbInfo->lenOfTags, true);
33,244✔
3964
                }
3965
                int64_t index = pThreadInfo->start_table_from;
7,729✔
3966
                for (int t = 0; t < pThreadInfo->ntables; t++) {
41,022✔
3967
                    if (generateRandData(
63,954✔
3968
                                stbInfo, pThreadInfo->sml_tags[t],
33,293✔
3969
                                stbInfo->lenOfTags,
33,293✔
3970
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
33,293✔
3971
                                stbInfo->tags, 1, true, NULL, index++)) {
3972
                        g_fail = true;
×
3973
                        goto END;
×
3974
                    }
3975
                    debugPrint("pThreadInfo->sml_tags[%d]: %s\n", t,
33,293✔
3976
                               pThreadInfo->sml_tags[t]);
3977
                }
3978
                pThreadInfo->lines = benchCalloc(g_arguments->reqPerReq, sizeof(char *), true);
7,729✔
3979
                for (int j = 0; (j < g_arguments->reqPerReq && !g_arguments->terminate); j++) {
8,116,107✔
3980
                    pThreadInfo->lines[j] = benchCalloc(1, pThreadInfo->max_sql_len, true);
8,113,927✔
3981
                }
3982
            } else {
3983
                pThreadInfo->json_array          = tools_cJSON_CreateArray();
4,304✔
3984
                pThreadInfo->sml_json_tags       = tools_cJSON_CreateArray();
4,290✔
3985
                pThreadInfo->sml_tags_json_array = (char **)benchCalloc( pThreadInfo->ntables, sizeof(char *), true);
4,290✔
3986
                for (int t = 0; t < pThreadInfo->ntables; t++) {
12,732✔
3987
                    if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
8,442✔
3988
                        generateSmlJsonTags(
7,365✔
3989
                            pThreadInfo->sml_json_tags,
3990
                                pThreadInfo->sml_tags_json_array,
3991
                                stbInfo,
3992
                            pThreadInfo->start_table_from, t);
3993
                    } else {
3994
                        generateSmlTaosJsonTags(
1,077✔
3995
                            pThreadInfo->sml_json_tags, stbInfo,
3996
                            pThreadInfo->start_table_from, t);
3997
                    }
3998
                }
3999
                pThreadInfo->lines = (char **)benchCalloc(1, sizeof(char *), true);
4,290✔
4000
                if (0 == stbInfo->interlaceRows && TSDB_SML_JSON_PROTOCOL == protocol) {
4,290✔
4001
                    pThreadInfo->line_buf_len = g_arguments->reqPerReq * accumulateRowLen(pThreadInfo->stbInfo->tags, pThreadInfo->stbInfo->iface);
3,549✔
4002
                    debugPrint("%s() LN%d, line_buf_len=%d\n", __func__, __LINE__, pThreadInfo->line_buf_len);
3,549✔
4003
                    pThreadInfo->lines[0]             = benchCalloc(1, pThreadInfo->line_buf_len, true);
3,549✔
4004
                    pThreadInfo->sml_json_value_array = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true);
3,549✔
4005
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
10,550✔
4006
                        generateSmlJsonValues(pThreadInfo->sml_json_value_array, stbInfo, t);
7,015✔
4007
                    }
4008
                }
4009
            }
4010
            break;
12,019✔
4011
        }
4012
        // taos
4013
        case TAOSC_IFACE: {
150,552✔
4014
            pThreadInfo->conn = initBenchConn(pThreadInfo->dbInfo->dbName);
150,552✔
4015
            if (pThreadInfo->conn == NULL) {
150,566✔
4016
                errorPrint("%s() failed to connect\n", __func__);
×
4017
                g_fail = true;
×
4018
                goto END;
×
4019
            }
4020
            char* command = benchCalloc(1, SHORT_1K_SQL_BUFF_LEN, false);
150,566✔
4021
            (void)snprintf(command, SHORT_1K_SQL_BUFF_LEN,
150,566✔
4022
                    g_arguments->escape_character ? "USE `%s`" : "USE %s",
150,566✔
4023
                    pThreadInfo->dbInfo->dbName);
150,566✔
4024
            if (queryDbExecCall(pThreadInfo->conn, command)) {
150,566✔
4025
                errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
4026
                g_fail = true;
×
4027
                goto END;
×
4028
            }
4029
            tmfree(command);
149,944✔
4030
            command = NULL;
150,566✔
4031

4032
            if (stbInfo->interlaceRows > 0) {
150,566✔
4033
                pThreadInfo->buffer = new_ds(0);
3,724✔
4034
            } else {
4035
                pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
146,220✔
4036
                if (g_arguments->check_sql) {
146,842✔
4037
                    pThreadInfo->csql = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
243✔
4038
                    memset(pThreadInfo->csql, 0, TSDB_MAX_ALLOWED_SQL_LEN);
243✔
4039
                }
4040
            }
4041

4042
            break;
150,566✔
4043
        }
4044
        default:
×
4045
            break;
×
4046
    }
4047

4048
END:
181,003✔
4049
    return NULL;
181,003✔
4050
}
4051

4052

4053
// init insert thread
4054
int32_t initInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, int64_t div, int64_t mod) {
38,517✔
4055
    int32_t  ret     = -1;
38,517✔
4056
    uint64_t tbNext  = stbInfo->childTblFrom;
38,517✔
4057
    int32_t  vgNext  = 0;
38,517✔
4058
    pthread_t   *pids  = benchCalloc(1, nthreads * sizeof(pthread_t),  true);
38,517✔
4059
    int threadCnt = 0;
38,517✔
4060
    uint64_t * bind_ts_array = NULL;
38,517✔
4061
    if (STMT2_IFACE == stbInfo->iface || STMT_IFACE == stbInfo->iface) {
38,517✔
4062
        bind_ts_array = benchCalloc(1, sizeof(int64_t)*g_arguments->prepared_rand, true);
4,237✔
4063
        initTsArray(bind_ts_array, stbInfo);
4,237✔
4064
    }
4065

4066

4067
    for (int32_t i = 0; i < nthreads && !g_arguments->terminate; i++) {
219,520✔
4068
        // set table
4069
        threadInfo *pThreadInfo = infos + i;
181,003✔
4070
        pThreadInfo->threadID   = i;
181,003✔
4071
        pThreadInfo->dbInfo     = database;
181,003✔
4072
        pThreadInfo->stbInfo    = stbInfo;
181,003✔
4073
        pThreadInfo->start_time = stbInfo->startTimestamp;
181,003✔
4074
        pThreadInfo->pos        = 0;
181,003✔
4075
        pThreadInfo->samplePos  = 0;
181,003✔
4076
        pThreadInfo->totalInsertRows = 0;
181,003✔
4077
        if (STMT2_IFACE == stbInfo->iface || STMT_IFACE == stbInfo->iface) {
181,003✔
4078
            pThreadInfo->bind_ts_array = benchCalloc(1, sizeof(int64_t)*g_arguments->prepared_rand, true);
18,362✔
4079
            memcpy(pThreadInfo->bind_ts_array, bind_ts_array, sizeof(int64_t)*g_arguments->prepared_rand);
18,362✔
4080

4081
        }
4082
        if (g_arguments->bind_vgroup) {
181,003✔
4083
            for (int32_t j = vgNext; j < database->vgroups; j++) {
1,262✔
4084
                SVGroup *vg = benchArrayGet(database->vgArray, j);
1,262✔
4085
                if (0 == vg->tbCountPerVgId) {
1,262✔
4086
                    continue;
78✔
4087
                }
4088
                pThreadInfo->vg               = vg;
1,184✔
4089
                pThreadInfo->ntables          = vg->tbCountPerVgId;
1,184✔
4090
                pThreadInfo->start_table_from = 0;
1,184✔
4091
                pThreadInfo->end_table_to     = vg->tbCountPerVgId - 1;
1,184✔
4092
                vgNext                        = j + 1;
1,184✔
4093
                break;
1,184✔
4094
            }
4095
        } else {
4096
            pThreadInfo->start_table_from = tbNext;
179,819✔
4097
            pThreadInfo->ntables          = i < mod ? div + 1 : div;
179,819✔
4098
            pThreadInfo->end_table_to     = i < mod ? tbNext + div : tbNext + div - 1;
179,819✔
4099
            tbNext                        = pThreadInfo->end_table_to + 1;
179,819✔
4100
        }
4101

4102
        // init conn
4103
        pthread_create(pids + i, NULL, genInsertTheadInfo,   pThreadInfo);
181,003✔
4104
        threadCnt ++;
181,003✔
4105
    }
4106

4107
    // wait threads
4108
    for (int i = 0; i < threadCnt; i++) {
219,520✔
4109
        infoPrint("init pthread_join %d ...\n", i);
181,003✔
4110
        pthread_join(pids[i], NULL);
181,003✔
4111
    }
4112

4113
    if (bind_ts_array) {
38,517✔
4114
        tmfree(bind_ts_array);
4,237✔
4115
    }
4116

4117
    tmfree(pids);
38,517✔
4118
    if (g_fail) {
38,517✔
4119
       return -1;
×
4120
    }
4121
    return 0;
38,517✔
4122
}
4123

4124
#ifdef LINUX
4125
#define EMPTY_SLOT -1
4126
// run with limit thread
4127
int32_t runInsertLimitThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, int32_t limitThread, threadInfo *infos, pthread_t *pids) {
13✔
4128
    infoPrint("run with bind vgroups limit thread. limit threads=%d nthread=%d\n", limitThread, nthreads);
13✔
4129

4130
    // slots save threadInfo array index
4131
    int32_t* slot = benchCalloc(limitThread, sizeof(int32_t), false);
13✔
4132
    int32_t  t    = 0; // thread index
13✔
4133
    for (int32_t i = 0; i < limitThread; i++) {
39✔
4134
        slot[i] = EMPTY_SLOT;
26✔
4135
    }
4136

4137
    while (!g_arguments->terminate) {
91✔
4138
        int32_t emptySlot = 0;
91✔
4139
        for (int32_t i = 0; i < limitThread; i++) {
273✔
4140
            int32_t idx = slot[i];
182✔
4141
            // check slot thread end
4142
            if(idx != EMPTY_SLOT) {
182✔
4143
                if (pthread_tryjoin_np(pids[idx], NULL) == EBUSY ) {
143✔
4144
                    // thread is running
4145
                    toolsMsleep(2000);
52✔
4146
                } else {
4147
                    // thread is end , set slot empty
4148
                    infoPrint("slot[%d] finished tidx=%d. completed thread count=%d\n", i, slot[i], t);
91✔
4149
                    slot[i] = EMPTY_SLOT;
91✔
4150
                }
4151
            }
4152

4153
            if (slot[i] == EMPTY_SLOT && t < nthreads) {
182✔
4154
                // slot is empty , set new thread to running
4155
                threadInfo *pThreadInfo = infos + t;
91✔
4156
                if (stbInfo->interlaceRows > 0) {
91✔
4157
                    pthread_create(pids + t, NULL, syncWriteInterlace,   pThreadInfo);
×
4158
                } else {
4159
                    pthread_create(pids + t, NULL, syncWriteProgressive, pThreadInfo);
91✔
4160
                }
4161

4162
                // save current and move next
4163
                slot[i] = t;
91✔
4164
                t++;
91✔
4165
                infoPrint("slot[%d] start new thread tidx=%d. \n", i, slot[i]);
91✔
4166
            }
4167

4168
            // check slot empty
4169
            if(slot[i] == EMPTY_SLOT) {
182✔
4170
                emptySlot++;
39✔
4171
            }
4172
        }
4173

4174
        // check all thread end
4175
        if(emptySlot == limitThread) {
91✔
4176
            debugPrint("all threads(%d) is run finished.\n", nthreads);
13✔
4177
            break;
13✔
4178
        } else {
4179
            debugPrint("current thread index=%d all thread=%d\n", t, nthreads);
78✔
4180
        }
4181
    }
4182

4183
    tmfree(slot);
13✔
4184

4185
    return 0;
13✔
4186
}
4187
#endif
4188

4189
// run
4190
int32_t runInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids) {
38,504✔
4191
    infoPrint("run insert thread. real nthread=%d\n", nthreads);
38,504✔
4192
    // create threads
4193
    int threadCnt = 0;
38,504✔
4194
    for (int i = 0; i < nthreads && !g_arguments->terminate; i++) {
219,416✔
4195
        threadInfo *pThreadInfo = infos + i;
180,912✔
4196
        if (stbInfo->interlaceRows > 0) {
180,912✔
4197
            pthread_create(pids + i, NULL, syncWriteInterlace,   pThreadInfo);
8,245✔
4198
        } else {
4199
            pthread_create(pids + i, NULL, syncWriteProgressive, pThreadInfo);
172,667✔
4200
        }
4201
        threadCnt ++;
180,912✔
4202
    }
4203

4204
    // wait threads
4205
    for (int i = 0; i < threadCnt; i++) {
219,416✔
4206
        infoPrint("pthread_join %d ...\n", i);
180,912✔
4207
        pthread_join(pids[i], NULL);
180,912✔
4208
    }
4209

4210
    return 0;
38,504✔
4211
}
4212

4213

4214
// exit and free resource
4215
int32_t exitInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids, int64_t spend) {
38,517✔
4216

4217
    if (g_arguments->terminate)  toolsMsleep(100);
38,517✔
4218

4219
    BArray *  total_delay_list = benchArrayInit(1, sizeof(int64_t));
38,517✔
4220
    int64_t   totalDelay = 0;
38,517✔
4221
    int64_t   totalDelay1 = 0;
38,517✔
4222
    int64_t   totalDelay2 = 0;
38,517✔
4223
    int64_t   totalDelay3 = 0;
38,517✔
4224
    uint64_t  totalInsertRows = 0;
38,517✔
4225

4226
    // free threads resource
4227
    for (int i = 0; i < nthreads; i++) {
225,888✔
4228
        threadInfo *pThreadInfo = infos + i;
187,371✔
4229
        // free check sql
4230
        if (pThreadInfo->csql) {
187,371✔
4231
            tmfree(pThreadInfo->csql);
243✔
4232
            pThreadInfo->csql = NULL;
243✔
4233
        }
4234

4235
        // close conn
4236
        int protocol = stbInfo->lineProtocol;
187,371✔
4237
        switch (stbInfo->iface) {
187,371✔
4238
            case REST_IFACE:
56✔
4239
                if (g_arguments->terminate)
56✔
4240
                    toolsMsleep(100);
×
4241
                destroySockFd(pThreadInfo->sockfd);
56✔
4242
                if (stbInfo->interlaceRows > 0) {
56✔
4243
                    free_ds(&pThreadInfo->buffer);
×
4244
                } else {
4245
                    tmfree(pThreadInfo->buffer);
56✔
4246
                    pThreadInfo->buffer = NULL;
56✔
4247
                }
4248
                break;
56✔
4249
            case SML_REST_IFACE:
518✔
4250
                if (g_arguments->terminate)
518✔
4251
                    toolsMsleep(100);
×
4252
                tmfree(pThreadInfo->buffer);
518✔
4253
                // on-purpose no break here
4254
            case SML_IFACE:
12,019✔
4255
                if (TSDB_SML_JSON_PROTOCOL != protocol
12,019✔
4256
                        && SML_JSON_TAOS_FORMAT != protocol) {
8,302✔
4257
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
41,022✔
4258
                        tmfree(pThreadInfo->sml_tags[t]);
33,293✔
4259
                    }
4260
                    for (int j = 0; j < g_arguments->reqPerReq; j++) {
8,224,829✔
4261
                        tmfree(pThreadInfo->lines[j]);
8,217,100✔
4262
                    }
4263
                    tmfree(pThreadInfo->sml_tags);
7,729✔
4264
                    pThreadInfo->sml_tags = NULL;
7,729✔
4265
                } else {
4266
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
12,732✔
4267
                        tmfree(pThreadInfo->sml_tags_json_array[t]);
8,442✔
4268
                    }
4269
                    tmfree(pThreadInfo->sml_tags_json_array);
4,290✔
4270
                    pThreadInfo->sml_tags_json_array = NULL;
4,290✔
4271
                    if (pThreadInfo->sml_json_tags) {
4,290✔
4272
                        tools_cJSON_Delete(pThreadInfo->sml_json_tags);
4,290✔
4273
                        pThreadInfo->sml_json_tags = NULL;
4,290✔
4274
                    }
4275
                    if (pThreadInfo->json_array) {
4,290✔
4276
                        tools_cJSON_Delete(pThreadInfo->json_array);
×
4277
                        pThreadInfo->json_array = NULL;
×
4278
                    }
4279
                }
4280
                if (pThreadInfo->lines) {
12,019✔
4281
                    if ((0 == stbInfo->interlaceRows)
12,019✔
4282
                            && (TSDB_SML_JSON_PROTOCOL == protocol)) {
10,960✔
4283
                        tmfree(pThreadInfo->lines[0]);
3,549✔
4284
                        for (int t = 0; t < pThreadInfo->ntables; t++) {
10,578✔
4285
                            tmfree(pThreadInfo->sml_json_value_array[t]);
7,029✔
4286
                        }
4287
                        tmfree(pThreadInfo->sml_json_value_array);
3,549✔
4288
                    }
4289
                    tmfree(pThreadInfo->lines);
12,019✔
4290
                    pThreadInfo->lines = NULL;
12,019✔
4291
                }
4292
                break;
12,019✔
4293

4294
            case STMT_IFACE:
20,381✔
4295
                // close stmt
4296
                if(pThreadInfo->conn && pThreadInfo->conn->stmt) {
20,381✔
4297
                    taos_stmt_close(pThreadInfo->conn->stmt);
14,013✔
4298
                    pThreadInfo->conn->stmt = NULL;
14,013✔
4299
                }
4300
            case STMT2_IFACE:
4301
                // close stmt2
4302
                if (pThreadInfo->conn && pThreadInfo->conn->stmt2) {
24,730✔
4303
                    taos_stmt2_close(pThreadInfo->conn->stmt2);
4,349✔
4304
                    pThreadInfo->conn->stmt2 = NULL;
4,349✔
4305
                }
4306

4307
                tmfree(pThreadInfo->bind_ts);
24,730✔
4308
                tmfree(pThreadInfo->bind_ts_array);
24,730✔
4309
                tmfree(pThreadInfo->bindParams);
24,730✔
4310

4311
                // free tagsStmt
4312
                BArray *tags = pThreadInfo->tagsStmt;
24,730✔
4313
                if(tags) {
24,730✔
4314
                    // free child
4315
                    for (int k = 0; k < tags->size; k++) {
81,745✔
4316
                        Field * tag = benchArrayGet(tags, k);
63,383✔
4317
                        tmfree(tag->stmtData.data);
63,383✔
4318
                        tag->stmtData.data = NULL;
63,383✔
4319
                        tmfree(tag->stmtData.is_null);
63,383✔
4320
                        tag->stmtData.is_null = NULL;
63,383✔
4321
                        tmfree(tag->stmtData.lengths);
63,383✔
4322
                        tag->stmtData.lengths = NULL;
63,383✔
4323
                    }
4324
                    // free parent
4325
                    benchArrayDestroy(tags);
18,362✔
4326
                    pThreadInfo->tagsStmt = NULL;
18,362✔
4327
                }
4328

4329
                // free lengths
4330
                if(pThreadInfo->lengths) {
24,730✔
4331
                    for(int c = 0; c <= stbInfo->cols->size; c++) {
254,776✔
4332
                        tmfree(pThreadInfo->lengths[c]);
236,414✔
4333
                    }
4334
                    free(pThreadInfo->lengths);
18,362✔
4335
                    pThreadInfo->lengths = NULL;
18,362✔
4336
                }
4337
                break;
24,730✔
4338

4339
            case TAOSC_IFACE:
150,566✔
4340
                if (stbInfo->interlaceRows > 0) {
150,566✔
4341
                    free_ds(&pThreadInfo->buffer);
3,724✔
4342
                } else {
4343
                    tmfree(pThreadInfo->buffer);
146,842✔
4344
                    pThreadInfo->buffer = NULL;
146,842✔
4345
                }
4346
                break;
150,566✔
4347

4348
            default:
×
4349
                break;
×
4350
        }
4351
        totalInsertRows += pThreadInfo->totalInsertRows;
187,371✔
4352
        totalDelay += pThreadInfo->totalDelay;
187,371✔
4353
        totalDelay1 += pThreadInfo->totalDelay1;
187,371✔
4354
        totalDelay2 += pThreadInfo->totalDelay2;
187,371✔
4355
        totalDelay3 += pThreadInfo->totalDelay3;
187,371✔
4356
        if (pThreadInfo->delayList != NULL) {
187,371✔
4357
            benchArrayAddBatch(total_delay_list, pThreadInfo->delayList->pData,
181,003✔
4358
                    pThreadInfo->delayList->size, true);
181,003✔
4359
            tmfree(pThreadInfo->delayList);
181,003✔
4360
            pThreadInfo->delayList = NULL;
181,003✔
4361
        }
4362
        //  free conn
4363
        if (pThreadInfo->conn) {
187,371✔
4364
            closeBenchConn(pThreadInfo->conn);
180,429✔
4365
            pThreadInfo->conn = NULL;
180,429✔
4366
        }
4367
    }
4368

4369
    // calculate result
4370
    qsort(total_delay_list->pData, total_delay_list->size,
38,517✔
4371
            total_delay_list->elemSize, compare);
4372

4373
    if (g_arguments->terminate)  toolsMsleep(100);
38,517✔
4374

4375
    tmfree(pids);
38,517✔
4376
    tmfree(infos);
38,517✔
4377

4378
    // print result
4379
    int ret = printTotalDelay(database, totalDelay, totalDelay1, totalDelay2, totalDelay3,
38,517✔
4380
                              total_delay_list, nthreads, totalInsertRows, spend);
4381
    benchArrayDestroy(total_delay_list);
38,517✔
4382
    if (g_fail || ret != 0) {
38,517✔
4383
        return -1;
810✔
4384
    }
4385
    return 0;
37,707✔
4386
}
4387

4388
static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo) {
38,570✔
4389
    if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE)
38,570✔
4390
            && !stbInfo->use_metric) {
3,423✔
4391
        errorPrint("%s", "schemaless cannot work without stable\n");
53✔
4392
        return -1;
53✔
4393
    }
4394

4395
    // check argument valid
4396
    preProcessArgument(stbInfo);
38,517✔
4397

4398
    // ntable
4399
    int64_t ntables = obtainTableCount(database, stbInfo);
38,517✔
4400
    if (ntables == 0) {
38,517✔
4401
        errorPrint("insert table count is zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4402
        return -1;
×
4403
    }
4404

4405
    // assign table to thread
4406
    int32_t  nthreads  = g_arguments->nthreads;
38,517✔
4407
    int64_t  div       = 0;  // ntable / nthread  division
38,517✔
4408
    int64_t  mod       = 0;  // ntable % nthread
38,517✔
4409
    int64_t  spend     = 0;
38,517✔
4410

4411
    if (g_arguments->bind_vgroup) {
38,517✔
4412
        nthreads = assignTableToThread(database, stbInfo);
347✔
4413
        if(nthreads == 0) {
347✔
4414
            errorPrint("bind vgroup assign theads count is zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4415
            return -1;
×
4416
        }
4417
    } else {
4418
        if(nthreads == 0) {
38,170✔
4419
            errorPrint("argument thread_count can not be zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4420
            return -1;
×
4421
        }
4422
        div = ntables / nthreads;
38,170✔
4423
        if (div < 1) {
38,170✔
4424
            nthreads = (int32_t)ntables;
11,929✔
4425
            div = 1;
11,929✔
4426
        }
4427
        mod = ntables % nthreads;
38,170✔
4428
    }
4429

4430

4431
    // init each thread information
4432
    pthread_t   *pids  = benchCalloc(1, nthreads * sizeof(pthread_t),  true);
38,517✔
4433
    threadInfo  *infos = benchCalloc(1, nthreads * sizeof(threadInfo), true);
38,517✔
4434

4435
    // init
4436
    int32_t ret = initInsertThread(database, stbInfo, nthreads, infos, div, mod);
38,517✔
4437
    if( ret != 0) {
38,517✔
4438
        errorPrint("init insert thread failed. %s.%s\n", database->dbName, stbInfo->stbName);
×
4439
        tmfree(pids);
×
4440
        tmfree(infos);
×
4441
        return ret;
×
4442
    }
4443

4444
    infoPrint("Estimate memory usage: %.2fMB\n", (double)g_memoryUsage / 1048576);
38,517✔
4445
    prompt(0);
38,517✔
4446

4447

4448
    // run
4449
    int64_t start = toolsGetTimestampUs();
38,517✔
4450
    if(g_arguments->bind_vgroup && g_arguments->nthreads < nthreads ) {
38,517✔
4451
        // need many batch execute all threads
4452
#ifdef LINUX
4453
        ret = runInsertLimitThread(database, stbInfo, nthreads, g_arguments->nthreads, infos, pids);
13✔
4454
#else
4455
        ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
4456
#endif
4457
    } else {
4458
        // only one batch execute all threads
4459
        ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
38,504✔
4460
    }
4461

4462
    int64_t end = toolsGetTimestampUs();
38,517✔
4463
    if(end == start) {
38,517✔
4464
        spend = 1;
×
4465
    } else {
4466
        spend = end - start;
38,517✔
4467
    }
4468

4469
    // exit
4470
    ret = exitInsertThread(database, stbInfo, nthreads, infos, pids, spend);
38,517✔
4471
    return ret;
38,517✔
4472
}
4473

4474
static int getStbInsertedRows(char* dbName, char* stbName, TAOS* taos) {
×
4475
    int rows = 0;
×
4476
    char command[SHORT_1K_SQL_BUFF_LEN];
×
4477
    (void)snprintf(command, SHORT_1K_SQL_BUFF_LEN, "SELECT COUNT(*) FROM %s.%s",
×
4478
             dbName, stbName);
4479
    TAOS_RES* res = taos_query(taos, command);
×
4480
    int code = taos_errno(res);
×
4481
    if (code != 0) {
×
4482
        printErrCmdCodeStr(command, code, res);
×
4483
        return -1;
×
4484
    }
4485
    TAOS_ROW row = taos_fetch_row(res);
×
4486
    if (row == NULL) {
×
4487
        rows = 0;
×
4488
    } else {
4489
        rows = (int)*(int64_t*)row[0];
×
4490
    }
4491
    taos_free_result(res);
×
4492
    return rows;
×
4493
}
4494

4495
static void create_tsma(TSMA* tsma, SBenchConn* conn, char* stbName) {
×
4496
    char command[SHORT_1K_SQL_BUFF_LEN];
×
4497
    int len = snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
4498
                       "CREATE sma INDEX %s ON %s function(%s) "
4499
                       "INTERVAL (%s) SLIDING (%s)",
4500
                       tsma->name, stbName, tsma->func,
4501
                       tsma->interval, tsma->sliding);
4502
    if (tsma->custom) {
×
4503
        (void)snprintf(command + len, SHORT_1K_SQL_BUFF_LEN - len,
×
4504
                 " %s", tsma->custom);
4505
    }
4506
    int code = queryDbExecCall(conn, command);
×
4507
    if (code == 0) {
×
4508
        infoPrint("successfully create tsma with command <%s>\n", command);
×
4509
    }
4510
}
×
4511

4512
static void* create_tsmas(void* args) {
×
4513
    tsmaThreadInfo* pThreadInfo = (tsmaThreadInfo*) args;
×
4514
    int inserted_rows = 0;
×
4515
    SBenchConn* conn = initBenchConn(pThreadInfo->dbName);
×
4516
    if (NULL == conn) {
×
4517
        return NULL;
×
4518
    }
4519
    int finished = 0;
×
4520
    if (taos_select_db(conn->taos, pThreadInfo->dbName)) {
×
4521
        errorPrint("failed to use database (%s)\n", pThreadInfo->dbName);
×
4522
        closeBenchConn(conn);
×
4523
        return NULL;
×
4524
    }
4525
    while (finished < pThreadInfo->tsmas->size && inserted_rows >= 0) {
×
4526
        inserted_rows = (int)getStbInsertedRows(
×
4527
                pThreadInfo->dbName, pThreadInfo->stbName, conn->taos);
4528
        for (int i = 0; i < pThreadInfo->tsmas->size; i++) {
×
4529
            TSMA* tsma = benchArrayGet(pThreadInfo->tsmas, i);
×
4530
            if (!tsma->done &&  inserted_rows >= tsma->start_when_inserted) {
×
4531
                create_tsma(tsma, conn, pThreadInfo->stbName);
×
4532
                tsma->done = true;
×
4533
                finished++;
×
4534
                break;
×
4535
            }
4536
        }
4537
        toolsMsleep(10);
×
4538
    }
4539
    benchArrayDestroy(pThreadInfo->tsmas);
×
4540
    closeBenchConn(conn);
×
4541
    return NULL;
×
4542
}
4543

4544
void changeGlobalIface() {
34,789✔
4545
    if (g_arguments->databases->size == 1) {
34,789✔
4546
            SDataBase *db = benchArrayGet(g_arguments->databases, 0);
34,776✔
4547
            if (db && db->superTbls->size == 1) {
34,776✔
4548
                SSuperTable *stb = benchArrayGet(db->superTbls, 0);
32,462✔
4549
                if (stb) {
32,462✔
4550
                    if(g_arguments->iface != stb->iface) {
32,462✔
4551
                        infoPrint("only 1 db 1 super table, g_arguments->iface(%d) replace with stb->iface(%d) \n", g_arguments->iface, stb->iface);
1,496✔
4552
                        g_arguments->iface = stb->iface;
1,496✔
4553
                    }
4554
                }
4555
            }
4556
    }
4557
}
34,789✔
4558

4559
int insertTestProcess() {
34,789✔
4560
    prompt(0);
34,789✔
4561

4562
    encodeAuthBase64();
34,789✔
4563
    // if only one stable, global iface same with stable->iface
4564
    changeGlobalIface();
34,789✔
4565

4566
    // move from loop to here
4567
    if (isRest(g_arguments->iface)) {
34,789✔
4568
        if (0 != convertServAddr(g_arguments->iface,
188✔
4569
                                 false,
4570
                                 1)) {
4571
            return -1;
×
4572
        }
4573
    }
4574

4575
    //loop create database
4576
    for (int i = 0; i < g_arguments->databases->size; i++) {
69,218✔
4577
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
34,802✔
4578

4579
        if (database->drop && !(g_arguments->supplementInsert)) {
34,802✔
4580
            if (database->superTbls && database->superTbls->size > 0) {
28,552✔
4581
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, 0);
28,552✔
4582
                if (stbInfo && isRest(stbInfo->iface)) {
28,552✔
4583
                    if (0 != convertServAddr(stbInfo->iface,
244✔
4584
                                             stbInfo->tcpTransfer,
244✔
4585
                                             stbInfo->lineProtocol)) {
244✔
4586
                        return -1;
×
4587
                    }
4588
                }
4589
            }
4590

4591
            if (createDatabase(database)) {
28,552✔
4592
                errorPrint("failed to create database (%s)\n",
373✔
4593
                        database->dbName);
4594
                return -1;
373✔
4595
            }
4596
            succPrint("created database (%s)\n", database->dbName);
28,179✔
4597
        } else if(g_arguments->bind_vgroup) {
6,250✔
4598
            // database already exist, get vgroups from server
4599
            SBenchConn* conn = initBenchConn(NULL);
128✔
4600
            if (conn) {
128✔
4601
                int32_t vgroups = getVgroupsNative(conn, database);
128✔
4602
                if (vgroups <=0) {
128✔
4603
                    closeBenchConn(conn);
×
4604
                    errorPrint("Database %s's vgroups is zero , db exist case.\n", database->dbName);
×
4605
                    return -1;
×
4606
                }
4607
                closeBenchConn(conn);
128✔
4608
                succPrint("Database (%s) get vgroups num is %d from server.\n", database->dbName, vgroups);
128✔
4609
            }
4610
        }
4611
    }
4612

4613
    // create super table && fill child tables && prepareSampleData
4614
    for (int i = 0; i < g_arguments->databases->size; i++) {
68,845✔
4615
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
34,429✔
4616
        if (database->superTbls) {
34,429✔
4617
            for (int j = 0; j < database->superTbls->size; j++) {
75,248✔
4618
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
40,819✔
4619
                if (stbInfo->iface != SML_IFACE
40,819✔
4620
                        && stbInfo->iface != SML_REST_IFACE
37,592✔
4621
                        && !stbInfo->childTblExists) {
37,396✔
4622
                    int code = getSuperTableFromServer(database, stbInfo);
36,361✔
4623
                    if (code == TSDB_CODE_FAILED) {
36,361✔
4624
                        return -1;
×
4625
                    }
4626

4627
                    // with create table if not exists, so if exist, can not report failed
4628
                    if (createSuperTable(database, stbInfo)) {
36,361✔
4629
                        return -1;
×
4630
                    }
4631

4632
                }
4633
                // fill last ts from super table
4634
                if(stbInfo->autoFillback && stbInfo->childTblExists) {
40,819✔
4635
                    fillSTableLastTs(database, stbInfo);
×
4636
                }
4637

4638
                // calc now
4639
                if(stbInfo->calcNow) {
40,819✔
4640
                    calcExprFromServer(database, stbInfo);
1,668✔
4641
                }
4642

4643
                // check fill child table count valid
4644
                if(fillChildTblName(database, stbInfo) <= 0) {
40,819✔
4645
                    infoPrint(" warning fill childs table count is zero, db:%s stb: %s \n", database->dbName, stbInfo->stbName);
411✔
4646
                }
4647
                if (0 != prepareSampleData(database, stbInfo)) {
40,819✔
4648
                    return -1;
×
4649
                }
4650

4651
                // early malloc buffer for auto create table
4652
                if((stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating) {
40,819✔
4653
                    prepareTagsStmt(stbInfo);
777✔
4654
                }
4655

4656
                // execute sqls
4657
                if (stbInfo->sqls) {
40,819✔
4658
                    char **sqls = stbInfo->sqls;
660✔
4659
                    while (*sqls) {
3,324✔
4660
                        queryDbExec(database, stbInfo, *sqls);
2,664✔
4661
                        sqls++;
2,664✔
4662
                    }
4663
                }
4664
            }
4665
        }
4666
    }
4667

4668
    // tsma
4669
    if (g_arguments->taosc_version == 3) {
34,416✔
4670
        for (int i = 0; i < g_arguments->databases->size; i++) {
68,845✔
4671
            SDataBase* database = benchArrayGet(g_arguments->databases, i);
34,429✔
4672
            if (database->superTbls) {
34,429✔
4673
                for (int j = 0; (j < database->superTbls->size
73,461✔
4674
                        && !g_arguments->terminate); j++) {
114,280✔
4675
                    SSuperTable* stbInfo =
4676
                        benchArrayGet(database->superTbls, j);
40,819✔
4677
                    if (stbInfo->tsmas == NULL) {
40,819✔
4678
                        continue;
17,053✔
4679
                    }
4680
                    if (stbInfo->tsmas->size > 0) {
23,766✔
4681
                        tsmaThreadInfo* pThreadInfo =
4682
                            benchCalloc(1, sizeof(tsmaThreadInfo), true);
×
4683
                        pthread_t tsmas_pid = {0};
×
4684
                        pThreadInfo->dbName = database->dbName;
×
4685
                        pThreadInfo->stbName = stbInfo->stbName;
×
4686
                        pThreadInfo->tsmas = stbInfo->tsmas;
×
4687
                        pthread_create(&tsmas_pid, NULL,
×
4688
                                       create_tsmas, pThreadInfo);
4689
                    }
4690
                }
4691
            }
4692
        }
4693
    }
4694

4695
    if (createChildTables()) return -1;
34,416✔
4696

4697
    // create sub threads for inserting data
4698
    for (int i = 0; i < g_arguments->databases->size; i++) {
64,798✔
4699
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
32,837✔
4700
        if (database->superTbls) {
32,837✔
4701
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
71,173✔
4702
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
39,199✔
4703
                if (stbInfo->insertRows == 0) {
39,199✔
4704
                    continue;
629✔
4705
                }
4706
                prompt(stbInfo->non_stop);
38,570✔
4707
                if (startMultiThreadInsertData(database, stbInfo)) {
38,570✔
4708
                    return -1;
863✔
4709
                }
4710
            }
4711
        }
4712
    }
4713
    return 0;
31,961✔
4714
}
4715

4716
//
4717
//     ------- STMT 2 -----------
4718
//
4719

4720
static int32_t stmt2BindAndSubmit(
251,480✔
4721
        threadInfo *pThreadInfo,
4722
        SChildTable *childTbl,
4723
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
4724
        int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w) {
4725

4726
    // create bindV
4727
    int32_t count            = 1;
251,480✔
4728
    TAOS_STMT2_BINDV * bindv = createBindV(count, 0, 0);
251,480✔
4729
    TAOS_STMT2 *stmt2        = pThreadInfo->conn->stmt2;
251,537✔
4730
    SSuperTable *stbInfo     = pThreadInfo->stbInfo;
251,586✔
4731

4732
    //
4733
    // bind
4734
    //
4735

4736
    // count
4737
    bindv->count = 1;
251,586✔
4738
    // tbnames
4739
    bindv->tbnames[0] = childTbl->name;
251,586✔
4740
    // tags
4741
    //bindv->tags[0] = NULL; // Progrssive mode tag put on prepare sql, no need put here
4742

4743
    // bind_cols
4744
    uint32_t batch = (g_arguments->reqPerReq > stbInfo->insertRows - i) ? (stbInfo->insertRows - i) : g_arguments->reqPerReq;
251,586✔
4745
    int32_t n = 0;
251,635✔
4746
    int64_t pos = i % g_arguments->prepared_rand;
251,635✔
4747

4748
    // adjust batch about pos
4749
    if(g_arguments->prepared_rand - pos < batch ) {
251,635✔
4750
        debugPrint("prepared_rand(%" PRId64 ") is not a multiple of num_of_records_per(%d), the batch size can be modify. before=%d after=%d\n",
55,692✔
4751
                    (int64_t)g_arguments->prepared_rand, (int32_t)g_arguments->reqPerReq, (int32_t)batch, (int32_t)(g_arguments->prepared_rand - pos));
4752
        batch = g_arguments->prepared_rand - pos;
55,720✔
4753
    }
4754

4755
    if (batch == 0) {
251,663✔
4756
        infoPrint("batch size is zero. pos = %"PRId64"\n", pos);
×
4757
        return 0;
×
4758
    }
4759

4760
    uint32_t generated = bindVColsProgressive(bindv, 0, pThreadInfo, batch, *timestamp, pos, childTbl, pkCur, pkCnt, &n);
251,663✔
4761
    if(generated == 0) {
251,572✔
4762
        errorPrint( "get cols data bind information failed. table: %s\n", childTbl->name);
×
4763
        freeBindV(bindv);
×
4764
        return -1;
×
4765
    }
4766
    *timestamp += n * stbInfo->timestamp_step;
251,572✔
4767

4768
    if (g_arguments->debug_print) {
251,621✔
4769
        showBindV(bindv, stbInfo->tags, stbInfo->cols);
260✔
4770
    }
4771

4772
    // bind and submit
4773
    int32_t code = submitStmt2(pThreadInfo, bindv, delay1, delay3, startTs, endTs, &generated, w);
251,621✔
4774
    // free
4775
    freeBindV(bindv);
251,600✔
4776

4777
    if(code != 0) {
251,579✔
4778
        errorPrint( "failed submitStmt2() progressive mode, table: %s . engine error: %s\n", childTbl->name, taos_stmt2_error(stmt2));
×
4779
        return code;
×
4780
    } else {
4781
        debugPrint("succ submitStmt2 progressive mode. table=%s batch=%d pos=%" PRId64 " ts=%" PRId64 " generated=%d\n",
251,579✔
4782
                childTbl->name, batch, pos, *timestamp, generated);
4783
        return generated;
251,635✔
4784
    }
4785
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc