• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.07
/tools/taos-tools/src/benchInsert.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15
#include "wrapDb.h"
16
#include <benchData.h>
17
#include <benchInsertMix.h>
18

19
static int32_t stmt2BindAndSubmit(
20
        threadInfo *pThreadInfo,
21
        SChildTable *childTbl,
22
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
23
        int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w);
24
TAOS_STMT2* initStmt2(TAOS* taos, bool single);        
25

26
#define FREE_PIDS_INFOS_RETURN_MINUS_1()            \
27
    do {                                            \
28
        tmfree(pids);                               \
29
        tmfree(infos);                              \
30
        return -1;                                  \
31
    } while (0)
32

33
#define FREE_RESOURCE()                             \
34
    do {                                            \
35
        if (pThreadInfo->conn)                      \
36
            closeBenchConn(pThreadInfo->conn);      \
37
        benchArrayDestroy(pThreadInfo->delayList);  \
38
        tmfree(pids);                               \
39
        tmfree(infos);                              \
40
    } while (0)                                     \
41

42
// REST
43
static int getSuperTableFromServerRest(
1✔
44
    SDataBase* database, SSuperTable* stbInfo, char *command) {
45

46
    // TODO(zero): it will create super table based on this error code.
47
    return TSDB_CODE_NOT_FOUND;
1✔
48
    // TODO(me): finish full implementation
49
#if 0
50
    int sockfd = createSockFd();
51
    if (sockfd < 0) {
52
        return -1;
53
    }
54

55
    int code = postProcessSql(command,
56
                         database->dbName,
57
                         database->precision,
58
                         REST_IFACE,
59
                         0,
60
                         g_arguments->port,
61
                         false,
62
                         sockfd,
63
                         NULL);
64

65
    destroySockFd(sockfd);
66
#endif   // 0
67
}
68

69
static int getSuperTableFromServerTaosc(
210✔
70
        SDataBase *database, SSuperTable *stbInfo, char *command) {
71
    TAOS_RES *res;
72
    TAOS_ROW row = NULL;
210✔
73
    SBenchConn *conn = initBenchConn(database->dbName);
210✔
74
    if (NULL == conn) {
210!
75
        return TSDB_CODE_FAILED;
×
76
    }
77

78
    res = taos_query(conn->taos, command);
210✔
79
    int32_t code = taos_errno(res);
210✔
80
    if (code != 0) {
210✔
81
        infoPrint("stable %s does not exist, will create one\n",
207✔
82
                  stbInfo->stbName);
83
        closeBenchConn(conn);
207✔
84
        return TSDB_CODE_NOT_FOUND;
207✔
85
    }
86
    infoPrint("find stable<%s>, will get meta data from server\n",
3✔
87
              stbInfo->stbName);
88

89
    // Check if the the existing super table matches exactly with the definitions in the JSON file.
90
    // If a hash table were used, the time complexity would be only O(n).
91
    // But taosBenchmark does not incorporate a hash table, the time complexity of the loop traversal is O(n^2).
92
    bool isTitleRow = true;
3✔
93
    uint32_t tag_count = 0;
3✔
94
    uint32_t col_count = 0;
3✔
95

96
    int fieldsNum = taos_num_fields(res);
3✔
97
    TAOS_FIELD_E* fields = taos_fetch_fields_e(res);
3✔
98

99
    if (fieldsNum < TSDB_MAX_DESCRIBE_METRIC || !fields) {
3!
100
        errorPrint("%s", "failed to fetch fields\n");
×
101
        taos_free_result(res);
×
102
        closeBenchConn(conn);
×
103
        return TSDB_CODE_FAILED;
×
104
    }
105

106
    while ((row = taos_fetch_row(res)) != NULL) {
19✔
107
        if (isTitleRow) {
16✔
108
            isTitleRow = false;
3✔
109
            continue;
3✔
110
        }
111
        int32_t *lengths = taos_fetch_lengths(res);
13✔
112
        if (lengths == NULL) {
13!
113
            errorPrint("%s", "failed to execute taos_fetch_length\n");
×
114
            taos_free_result(res);
×
115
            closeBenchConn(conn);
×
116
            return TSDB_CODE_FAILED;
×
117
        }
118
        if (strncasecmp((char *) row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "tag",
13✔
119
                        strlen("tag")) == 0) {
120
            if (stbInfo->tags == NULL || stbInfo->tags->size == 0 || tag_count >= stbInfo->tags->size) {
6!
121
                errorPrint("%s", "existing stable tag mismatched with that defined in JSON\n");
×
122
                taos_free_result(res);
×
123
                closeBenchConn(conn);
×
124
                return TSDB_CODE_FAILED;
×
125
            }
126
            uint8_t tagType = convertStringToDatatype(
12✔
127
                    (char *) row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
6✔
128
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX], &(fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].precision));
6✔
129
            char *tagName = (char *) row[TSDB_DESCRIBE_METRIC_FIELD_INDEX];
6✔
130
            if (!searchBArray(stbInfo->tags, tagName,
6!
131
                              lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX], tagType)) {
132
                errorPrint("existing stable tag:%s not found in JSON tags.\n", tagName);
×
133
                taos_free_result(res);
×
134
                closeBenchConn(conn);
×
135
                return TSDB_CODE_FAILED;
×
136
            }
137
            tag_count += 1;
6✔
138
        } else {
139
            if (stbInfo->cols == NULL || stbInfo->cols->size == 0 || col_count >= stbInfo->cols->size) {
7!
140
                errorPrint("%s", "existing stable column mismatched with that defined in JSON\n");
×
141
                taos_free_result(res);
×
142
                closeBenchConn(conn);
×
143
                return TSDB_CODE_FAILED;
×
144
            }
145
            uint8_t colType = convertStringToDatatype(
14✔
146
                    (char *) row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
7✔
147
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX], &(fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].precision));
7✔
148
            char * colName = (char *) row[TSDB_DESCRIBE_METRIC_FIELD_INDEX];
7✔
149
            if (!searchBArray(stbInfo->cols, colName,
7!
150
                              lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX], colType)) {
151
                errorPrint("existing stable col:%s not found in JSON cols.\n", colName);
×
152
                taos_free_result(res);
×
153
                closeBenchConn(conn);
×
154
                return TSDB_CODE_FAILED;
×
155
            }
156
            col_count += 1;
7✔
157
        }
158
    }  // end while
159
    taos_free_result(res);
3✔
160
    closeBenchConn(conn);
3✔
161
    if (tag_count != stbInfo->tags->size) {
3!
162
        errorPrint("%s", "existing stable tag mismatched with that defined in JSON\n");
×
163
        return TSDB_CODE_FAILED;
×
164
    }
165

166
    if (col_count != stbInfo->cols->size) {
3!
167
        errorPrint("%s", "existing stable column mismatched with that defined in JSON\n");
×
168
        return TSDB_CODE_FAILED;
×
169
    }
170

171
    return TSDB_CODE_SUCCESS;
3✔
172
}
173

174

175
static int getSuperTableFromServer(SDataBase* database, SSuperTable* stbInfo) {
211✔
176
    int ret = 0;
211✔
177
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
211✔
178
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
211✔
179
             "DESCRIBE `%s`.`%s`", database->dbName,
180
             stbInfo->stbName);
181

182
    if (REST_IFACE == stbInfo->iface) {
211✔
183
        ret = getSuperTableFromServerRest(database, stbInfo, command);
1✔
184
    } else {
185
        ret = getSuperTableFromServerTaosc(database, stbInfo, command);
210✔
186
    }
187

188
    return ret;
211✔
189
}
190

191
static int queryDbExec(SDataBase *database,
268✔
192
                       SSuperTable *stbInfo, char *command) {
193
    int ret = 0;
268✔
194
    if (isRest(stbInfo->iface)) {
268✔
195
        if (0 != convertServAddr(stbInfo->iface, false, 1)) {
1!
196
            errorPrint("%s", "Failed to convert server address\n");
×
197
            return -1;
×
198
        }
199
        int sockfd = createSockFd();
1✔
200
        if (sockfd < 0) {
1!
201
            ret = -1;
×
202
        } else {
203
            ret = queryDbExecRest(command,
1✔
204
                              database->dbName,
205
                              database->precision,
206
                              stbInfo->iface,
1✔
207
                              stbInfo->lineProtocol,
1✔
208
                              stbInfo->tcpTransfer,
1✔
209
                              sockfd);
210
            destroySockFd(sockfd);
1✔
211
        }
212
    } else {
213
        SBenchConn* conn = initBenchConn(database->dbName);
267✔
214
        if (NULL == conn) {
267!
215
            ret = -1;
×
216
        } else {
217
            ret = queryDbExecCall(conn, command);
267✔
218
            int32_t trying = g_arguments->keep_trying;
267✔
219
            while (ret && trying) {
267!
220
                infoPrint("will sleep %"PRIu32" milliseconds then re-execute command: %s\n",
×
221
                          g_arguments->trying_interval, command);
222
                toolsMsleep(g_arguments->trying_interval);
×
223
                ret = queryDbExecCall(conn, command);
×
224
                if (trying != -1) {
×
225
                    trying--;
×
226
                }
227
            }
228
            if (0 != ret) {
267!
229
                ret = -1;
×
230
            }
231
            closeBenchConn(conn);
267✔
232
        }
233
    }
234

235
    return ret;
268✔
236
}
237

238
int getCompressStr(Field* col, char* buf) {
28,128✔
239
    int pos = 0;
28,128✔
240
    if(strlen(col->encode) > 0) {
28,128!
241
        pos +=sprintf(buf + pos, "encode \'%s\' ", col->encode);
×
242
    }
243
    if(strlen(col->compress) > 0) {
28,128!
244
        pos +=sprintf(buf + pos, "compress \'%s\' ", col->compress);
×
245
    }
246
    if(strlen(col->level) > 0) {
28,128!
247
        pos +=sprintf(buf + pos, "level \'%s\' ", col->level);
×
248
    }
249

250
    return pos;
28,128✔
251
}
252

253
static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
211✔
254
    if (g_arguments->supplementInsert) {
211✔
255
        return 0;
1✔
256
    }
257

258
    uint32_t col_buffer_len = (TSDB_COL_NAME_LEN + 15 + COMP_NAME_LEN*3) * stbInfo->cols->size;
210✔
259
    char         *colsBuf = benchCalloc(1, col_buffer_len, false);
210✔
260
    char*         command = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
210✔
261
    int          len = 0;
210✔
262

263
    for (int colIndex = 0; colIndex < stbInfo->cols->size; colIndex++) {
28,338✔
264
        Field * col = benchArrayGet(stbInfo->cols, colIndex);
28,128✔
265
        int n;
266
        if (col->type == TSDB_DATA_TYPE_BINARY ||
28,128✔
267
            col->type == TSDB_DATA_TYPE_NCHAR ||
27,076✔
268
            col->type == TSDB_DATA_TYPE_VARBINARY ||
26,428✔
269
            col->type == TSDB_DATA_TYPE_GEOMETRY) {
26,424✔
270
            n = snprintf(colsBuf + len, col_buffer_len - len,
3,422✔
271
                    ",%s %s(%d)", col->name,
1,711✔
272
                    convertDatatypeToString(col->type), col->length);
1,711✔
273
            if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) {
1,711!
274
                errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
×
275
                           colIndex);
276
                return -1;
×
277
            }
278
        } else if (col->type == TSDB_DATA_TYPE_DECIMAL
26,417✔
279
                || col->type == TSDB_DATA_TYPE_DECIMAL64) {
26,414✔
280
            n = snprintf(colsBuf + len, col_buffer_len - len,
12✔
281
                    ",%s %s(%d,%d)", col->name,
6✔
282
                    convertDatatypeToString(col->type), col->precision, col->scale);
6✔
283
        } else {
284
            n = snprintf(colsBuf + len, col_buffer_len - len,
52,822✔
285
                    ",%s %s", col->name,
26,411✔
286
                    convertDatatypeToString(col->type));
26,411✔
287
        }
288

289
        // primary key
290
        if(stbInfo->primary_key && colIndex == 0) {
28,128!
291
            len += n;
×
292
            n = snprintf(colsBuf + len, col_buffer_len - len, " %s", PRIMARY_KEY);
×
293
        }
294

295
        // compress key
296
        char keys[COMP_NAME_LEN*3] = "";
28,128✔
297
        if (getCompressStr(col, keys) > 0) {
28,128!
298
            len += n;
×
299
            n = snprintf(colsBuf + len, col_buffer_len - len, " %s", keys);
×
300
        }
301

302
        if (n < 0 || n >= col_buffer_len - len) {
28,128!
303
            errorPrint("%s() LN%d, snprintf overflow on %d\n",
×
304
                       __func__, __LINE__, colIndex);
305
            break;
×
306
        } else {
307
            len += n;
28,128✔
308
        }
309
    }
310

311
    // save for creating child table
312
    stbInfo->colsOfCreateChildTable =
210✔
313
        (char *)benchCalloc(len + TIMESTAMP_BUFF_LEN, 1, true);
210✔
314

315
    snprintf(stbInfo->colsOfCreateChildTable, len + TIMESTAMP_BUFF_LEN,
210✔
316
             "(%s timestamp%s)", stbInfo->primaryKeyName, colsBuf);
210✔
317

318
    if (stbInfo->tags->size == 0) {
210✔
319
        free(colsBuf);
6✔
320
        free(command);
6✔
321
        return 0;
6✔
322
    }
323

324
    uint32_t tag_buffer_len = (TSDB_COL_NAME_LEN + 15) * stbInfo->tags->size;
204✔
325
    char *tagsBuf = benchCalloc(1, tag_buffer_len, false);
204✔
326
    int  tagIndex;
327
    len = 0;
204✔
328

329
    int n;
330
    n = snprintf(tagsBuf + len, tag_buffer_len - len, "(");
204✔
331
    if (n < 0 || n >= tag_buffer_len - len) {
204!
332
        errorPrint("%s() LN%d snprintf overflow\n",
×
333
                       __func__, __LINE__);
334
        free(colsBuf);
×
335
        free(command);
×
336
        tmfree(tagsBuf);
×
337
        return -1;
×
338
    } else {
339
        len += n;
204✔
340
    }
341
    for (tagIndex = 0; tagIndex < stbInfo->tags->size; tagIndex++) {
1,230✔
342
        Field *tag = benchArrayGet(stbInfo->tags, tagIndex);
1,028✔
343
        if (tag->type == TSDB_DATA_TYPE_BINARY ||
1,028✔
344
            tag->type == TSDB_DATA_TYPE_NCHAR ||
779✔
345
            tag->type == TSDB_DATA_TYPE_VARBINARY ||
725✔
346
            tag->type == TSDB_DATA_TYPE_GEOMETRY) {
711✔
347
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
642✔
348
                    "%s %s(%d),", tag->name,
321✔
349
                    convertDatatypeToString(tag->type), tag->length);
321✔
350
            if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) {
321!
351
                errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
×
352
                           tagIndex);
353
                return -1;
×
354
            }
355
        } else if (tag->type == TSDB_DATA_TYPE_JSON) {
707✔
356
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
2✔
357
                    "%s json", tag->name);
2✔
358
            if (n < 0 || n >= tag_buffer_len - len) {
2!
359
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
360
                       __func__, __LINE__, tagIndex);
361
                break;
×
362
            } else {
363
                len += n;
2✔
364
            }
365
            goto skip;
2✔
366
        }
367
        // else if (tag->type == TSDB_DATA_TYPE_DECIMAL
368
        //         || tag->type == TSDB_DATA_TYPE_DECIMAL64) {
369
        //     n = snprintf(tagsBuf + len, tag_buffer_len - len,
370
        //             "%s %s(%d,%d),", tag->name,
371
        //             convertDatatypeToString(tag->type), tag->precision, tag->scale);
372
        // }
373
        else {
374
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
1,410✔
375
                    "%s %s,", tag->name,
705✔
376
                    convertDatatypeToString(tag->type));
705✔
377
        }
378

379
        if (n < 0 || n >= tag_buffer_len - len) {
1,026!
380
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
381
                       __func__, __LINE__, tagIndex);
382
            break;
×
383
        } else {
384
            len += n;
1,026✔
385
        }
386
    }
387
    len -= 1;
202✔
388
skip:
204✔
389
    snprintf(tagsBuf + len, tag_buffer_len - len, ")");
204✔
390

391
    int length = snprintf(
204✔
392
        command, TSDB_MAX_ALLOWED_SQL_LEN,
393
        g_arguments->escape_character
204✔
394
            ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` (%s TIMESTAMP%s) TAGS %s"
395
            : "CREATE TABLE IF NOT EXISTS %s.%s (%s TIMESTAMP%s) TAGS %s",
396
        database->dbName, stbInfo->stbName, stbInfo->primaryKeyName, colsBuf, tagsBuf);
204✔
397
    tmfree(colsBuf);
204✔
398
    tmfree(tagsBuf);
204✔
399
    if (stbInfo->comment != NULL) {
204!
400
        length += snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length,
×
401
                           " COMMENT '%s'", stbInfo->comment);
402
    }
403
    if (stbInfo->delay >= 0) {
204!
404
        length += snprintf(command + length,
×
405
                           TSDB_MAX_ALLOWED_SQL_LEN - length, " DELAY %d",
×
406
                           stbInfo->delay);
407
    }
408
    if (stbInfo->file_factor >= 0) {
204!
409
        length +=
×
410
            snprintf(command + length,
×
411
                     TSDB_MAX_ALLOWED_SQL_LEN - length, " FILE_FACTOR %f",
×
412
                     (float)stbInfo->file_factor / 100);
×
413
    }
414
    if (stbInfo->rollup != NULL) {
204!
415
        length += snprintf(command + length,
×
416
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
417
                           " ROLLUP(%s)", stbInfo->rollup);
418
    }
419

420
    if (stbInfo->max_delay != NULL) {
204!
421
        length += snprintf(command + length,
×
422
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
423
                " MAX_DELAY %s", stbInfo->max_delay);
424
    }
425

426
    if (stbInfo->watermark != NULL) {
204!
427
        length += snprintf(command + length,
×
428
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
429
                " WATERMARK %s", stbInfo->watermark);
430
    }
431

432
    // not support ttl in super table
433
    /*
434
    if (stbInfo->ttl != 0) {
435
        length += snprintf(command + length,
436
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
437
                " TTL %d", stbInfo->ttl);
438
    }
439
    */
440

441
    bool first_sma = true;
204✔
442
    for (int i = 0; i < stbInfo->cols->size; i++) {
23,202✔
443
        Field * col = benchArrayGet(stbInfo->cols, i);
22,998✔
444
        if (col->sma) {
22,998!
445
            if (first_sma) {
×
446
                n = snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, " SMA(%s", col->name);
×
447
                first_sma = false;
×
448
            } else {
449
                n = snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, ",%s", col->name);
×
450
            }
451

452
            if (n < 0 || n > TSDB_MAX_ALLOWED_SQL_LEN - length) {
×
453
                errorPrint("%s() LN%d snprintf overflow on %d iteral\n", __func__, __LINE__, i);
×
454
                break;
×
455
            } else {
456
                length += n;
×
457
            }
458
        }
459
    }
460
    if (!first_sma) {
204!
461
        snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, ")");
×
462
    }
463
    debugPrint("create stable: <%s>\n", command);
204✔
464

465
    int ret = queryDbExec(database, stbInfo, command);
204✔
466
    free(command);
204✔
467
    return ret;
204✔
468
}
469

470

471
int32_t getVgroupsNative(SBenchConn *conn, SDataBase *database) {
2✔
472
    int     vgroups = 0;
2✔
473
    char    cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
2✔
474
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
2✔
475
            g_arguments->escape_character
2!
476
            ? "SHOW `%s`.VGROUPS"
477
            : "SHOW %s.VGROUPS",
478
            database->dbName);
479

480
    TAOS_RES* res = taos_query(conn->taos, cmd);
2✔
481
    int code = taos_errno(res);
2✔
482
    if (code) {
2!
483
        printErrCmdCodeStr(cmd, code, res);
×
484
        return -1;
×
485
    }
486

487
    TAOS_ROW row = NULL;
2✔
488
    while ((row = taos_fetch_row(res)) != NULL) {
22✔
489
        vgroups++;
20✔
490
    }
491
    debugPrint("%s() LN%d, vgroups: %d\n", __func__, __LINE__, vgroups);
2✔
492
    taos_free_result(res);
2✔
493

494
    database->vgroups = vgroups;
2✔
495
    database->vgArray = benchArrayInit(vgroups, sizeof(SVGroup));
2✔
496
    for (int32_t v = 0; (v < vgroups
2✔
497
            && !g_arguments->terminate); v++) {
22!
498
        SVGroup *vg = benchCalloc(1, sizeof(SVGroup), true);
20✔
499
        benchArrayPush(database->vgArray, vg);
20✔
500
    }
501

502
    res = taos_query(conn->taos, cmd);
2✔
503
    code = taos_errno(res);
2✔
504
    if (code) {
2!
505
        printErrCmdCodeStr(cmd, code, res);
×
506
        return -1;
×
507
    }
508

509
    int32_t vgItem = 0;
2✔
510
    while (((row = taos_fetch_row(res)) != NULL)
22✔
511
            && !g_arguments->terminate) {
22!
512
        SVGroup *vg = benchArrayGet(database->vgArray, vgItem);
20✔
513
        vg->vgId = *(int32_t*)row[0];
20✔
514
        vgItem++;
20✔
515
    }
516
    taos_free_result(res);
2✔
517

518
    return vgroups;
2✔
519
}
520

521
int geneDbCreateCmd(SDataBase *database, char *command, int remainVnodes) {
189✔
522
    int dataLen = 0;
189✔
523
    int n;
524

525
    // create database
526
    n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen,
189✔
527
                g_arguments->escape_character
189✔
528
                    ? "CREATE DATABASE IF NOT EXISTS `%s`"
529
                    : "CREATE DATABASE IF NOT EXISTS %s",
530
                    database->dbName);
531

532
    if (n < 0 || n >= SHORT_1K_SQL_BUFF_LEN - dataLen) {
189!
533
        errorPrint("%s() LN%d snprintf overflow\n",
×
534
                           __func__, __LINE__);
535
        return -1;
×
536
    } else {
537
        dataLen += n;
189✔
538
    }
539

540
    int vgroups = g_arguments->inputted_vgroups;
189✔
541

542
    // append config items
543
    if (database->cfgs) {
189!
544
        for (int i = 0; i < database->cfgs->size; i++) {
328✔
545
            SDbCfg* cfg = benchArrayGet(database->cfgs, i);
139✔
546

547
            // check vgroups
548
            if (trimCaseCmp(cfg->name, "vgroups") == 0) {
139✔
549
                if (vgroups > 0) {
41!
550
                    // inputted vgroups by commandline
551
                    infoPrint("ignore config set vgroups %d\n", cfg->valueint);
×
552
                } else {
553
                    vgroups = cfg->valueint;
41✔
554
                }
555
                continue;
41✔
556
            }
557

558
            if (cfg->valuestring) {
98✔
559
                n = snprintf(command + dataLen,
21✔
560
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
21✔
561
                            " %s %s", cfg->name, cfg->valuestring);
562
            } else {
563
                n = snprintf(command + dataLen,
77✔
564
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
77✔
565
                            " %s %d", cfg->name, cfg->valueint);
566
            }
567
            if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - dataLen) {
98!
568
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
569
                           __func__, __LINE__, i);
570
                break;
×
571
            } else {
572
                dataLen += n;
98✔
573
            }
574
        }
575
    }
576

577
    // not found vgroups
578
    if (vgroups > 0) {
189✔
579
        dataLen += snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen, " VGROUPS %d", vgroups);
63✔
580
    }
581

582
    switch (database->precision) {
189!
583
        case TSDB_TIME_PRECISION_MILLI:
187✔
584
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
187✔
585
                                " PRECISION \'ms\';");
586
            break;
187✔
587
        case TSDB_TIME_PRECISION_MICRO:
1✔
588
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
1✔
589
                                " PRECISION \'us\';");
590
            break;
1✔
591
        case TSDB_TIME_PRECISION_NANO:
1✔
592
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
1✔
593
                                " PRECISION \'ns\';");
594
            break;
1✔
595
    }
596

597
    return dataLen;
189✔
598
}
599

600
int createDatabaseRest(SDataBase* database) {
9✔
601
    int32_t code = 0;
9✔
602
    char       command[SHORT_1K_SQL_BUFF_LEN] = "\0";
9✔
603

604
    int sockfd = createSockFd();
9✔
605
    if (sockfd < 0) {
9✔
606
        return -1;
2✔
607
    }
608

609
    // drop exist database
610
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
7✔
611
            g_arguments->escape_character
7!
612
                ? "DROP DATABASE IF EXISTS `%s`;"
613
                : "DROP DATABASE IF EXISTS %s;",
614
             database->dbName);
615
    code = postProcessSql(command,
7✔
616
                        database->dbName,
617
                        database->precision,
618
                        REST_IFACE,
619
                        0,
620
                        g_arguments->port,
7✔
621
                        false,
622
                        sockfd,
623
                        NULL);
624
    if (code != 0) {
7!
625
        errorPrint("Failed to drop database %s\n", database->dbName);
×
626
    }
627

628
    // create database
629
    int remainVnodes = INT_MAX;
7✔
630
    geneDbCreateCmd(database, command, remainVnodes);
7✔
631
    code = postProcessSql(command,
7✔
632
                        database->dbName,
633
                        database->precision,
634
                        REST_IFACE,
635
                        0,
636
                        g_arguments->port,
7✔
637
                        false,
638
                        sockfd,
639
                        NULL);
640
    int32_t trying = g_arguments->keep_trying;
7✔
641
    while (code && trying) {
7!
642
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
643
                "re-create database %s\n",
644
                g_arguments->trying_interval, database->dbName);
645
        toolsMsleep(g_arguments->trying_interval);
×
646
        code = postProcessSql(command,
×
647
                        database->dbName,
648
                        database->precision,
649
                        REST_IFACE,
650
                        0,
651
                        g_arguments->port,
×
652
                        false,
653
                        sockfd,
654
                        NULL);
655
        if (trying != -1) {
×
656
            trying--;
×
657
        }
658
    }
659

660
    destroySockFd(sockfd);
7✔
661
    return code;
7✔
662
}
663

664
int32_t getRemainVnodes(SBenchConn *conn) {
2✔
665
    int remainVnodes = 0;
2✔
666
    char command[SHORT_1K_SQL_BUFF_LEN] = "SHOW DNODES";
2✔
667

668
    TAOS_RES *res = taos_query(conn->taos, command);
2✔
669
    int32_t   code = taos_errno(res);
2✔
670
    if (code) {
2!
671
        printErrCmdCodeStr(command, code, res);
×
672
        closeBenchConn(conn);
×
673
        return -1;
×
674
    }
675
    TAOS_ROW row = NULL;
2✔
676
    while ((row = taos_fetch_row(res)) != NULL) {
4✔
677
        remainVnodes += (*(int16_t*)(row[3]) - *(int16_t*)(row[2]));
2✔
678
    }
679
    debugPrint("%s() LN%d, remainVnodes: %d\n",
2✔
680
               __func__, __LINE__, remainVnodes);
681
    taos_free_result(res);
2✔
682
    return remainVnodes;
2✔
683
}
684

685
int createDatabaseTaosc(SDataBase* database) {
189✔
686
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
189✔
687
    // conn
688
    SBenchConn* conn = initBenchConn(NULL);
189✔
689
    if (NULL == conn) {
189✔
690
        return -1;
7✔
691
    }
692

693
    // drop stream in old database
694
    for (int i = 0; i < g_arguments->streams->size; i++) {
182!
695
        SSTREAM* stream = benchArrayGet(g_arguments->streams, i);
×
696
        if (stream->drop) {
×
697
            snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
698
                        "DROP STREAM IF EXISTS %s;",
699
                    stream->stream_name);
×
700
            if (queryDbExecCall(conn, command)) {
×
701
                closeBenchConn(conn);
×
702
                return -1;
×
703
            }
704
            infoPrint("%s\n", command);
×
705
            memset(command, 0, SHORT_1K_SQL_BUFF_LEN);
×
706
        }
707
    }
708

709
    // drop old database
710
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
182✔
711
            g_arguments->escape_character
182✔
712
                ? "DROP DATABASE IF EXISTS `%s`;":
713
            "DROP DATABASE IF EXISTS %s;",
714
             database->dbName);
715
    if (0 != queryDbExecCall(conn, command)) {
182!
716
        if (g_arguments->dsn) {
×
717
            // websocket
718
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
719
                      "to drop database! DROP DATABASE failure is ignored!\n");
720
        }
721
        closeBenchConn(conn);
×
722
        return -1;
×
723
    }
724

725
    // get remain vgroups
726
    int remainVnodes = INT_MAX;
182✔
727
    if (g_arguments->bind_vgroup) {
182✔
728
        remainVnodes = getRemainVnodes(conn);
2✔
729
        if (0 >= remainVnodes) {
2!
730
            errorPrint("Remain vnodes %d, failed to create database\n",
×
731
                       remainVnodes);
732
            return -1;
×
733
        }
734
    }
735

736
    // generate and execute create database sql
737
    geneDbCreateCmd(database, command, remainVnodes);
182✔
738
    int32_t code = queryDbExecCall(conn, command);
182✔
739
    int32_t trying = g_arguments->keep_trying;
182✔
740
    while (code && trying) {
182!
741
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
742
                  "re-create database %s\n",
743
                  g_arguments->trying_interval, database->dbName);
744
        toolsMsleep(g_arguments->trying_interval);
×
745
        code = queryDbExecCall(conn, command);
×
746
        if (trying != -1) {
×
747
            trying--;
×
748
        }
749
    }
750

751
    if (code) {
182!
752
        if (g_arguments->dsn) {
×
753
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
754
                      "to create database! CREATE DATABASE "
755
                      "failure is ignored!\n");
756
        } 
757

758
        closeBenchConn(conn);
×
759
        errorPrint("\ncreate database %s failed!\n\n", database->dbName);
×
760
        return -1;
×
761
    }
762
    infoPrint("command to create database: <%s>\n", command);
182✔
763

764

765
    // malloc and get vgroup
766
    if (g_arguments->bind_vgroup) {
182✔
767
        int32_t vgroups;
768
        vgroups = getVgroupsNative(conn, database);
2✔
769
        if (vgroups <= 0) {
2!
770
            closeBenchConn(conn);
×
771
            errorPrint("Database %s's vgroups is %d\n",
×
772
                        database->dbName, vgroups);
773
            return -1;
×
774
        }
775
    }
776

777
    closeBenchConn(conn);
182✔
778
    return 0;
182✔
779
}
780

781
int createDatabase(SDataBase* database) {
198✔
782
    int ret = 0;
198✔
783
    if (isRest(g_arguments->iface)) {
198✔
784
        ret = createDatabaseRest(database);
9✔
785
    } else {
786
        ret = createDatabaseTaosc(database);
189✔
787
    }
788
    return ret;
198✔
789
}
790

791
static int generateChildTblName(int len, char *buffer, SDataBase *database,
28,025✔
792
                                SSuperTable *stbInfo, uint64_t tableSeq, char* tagData, int i,
793
                                char *ttl, char *tableName) {
794
    if (0 == len) {
28,025✔
795
        memset(buffer, 0, TSDB_MAX_ALLOWED_SQL_LEN);
23,706✔
796
        len += snprintf(buffer + len,
23,706✔
797
                        TSDB_MAX_ALLOWED_SQL_LEN - len, "CREATE TABLE");
23,706✔
798
    }
799

800
    const char *tagStart = tagData + i * stbInfo->lenOfTags;  // start position of current row's TAG
28,025✔
801
    const char *tagsForSQL = tagStart;  // actual TAG part for SQL
28,025✔
802
    const char *fmt = g_arguments->escape_character ?
56,050✔
803
        " IF NOT EXISTS `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s " :
28,025✔
804
        " IF NOT EXISTS %s.%s USING %s.%s TAGS (%s) %s ";
805

806
    if (stbInfo->useTagTableName) {
28,025✔
807
        char *firstComma = strchr(tagStart, ',');
3✔
808
        if (firstComma == NULL && tagStart >= firstComma) {
3!
809
            errorPrint("Invalid tag data format: %s\n", tagStart);
×
810
            return -1;
×
811
        }
812
        size_t nameLen = firstComma - tagStart;
3✔
813
        strncpy(tableName, tagStart, nameLen);
3✔
814
        tableName[nameLen] = '\0';
3✔
815
        tagsForSQL = firstComma + 1;      
3✔
816
    } else {
817
        // generate table name using prefix + sequence number 
818
        snprintf(tableName, TSDB_TABLE_NAME_LEN, "%s%" PRIu64, 
28,022✔
819
                 stbInfo->childTblPrefix, tableSeq);
820
        tagsForSQL = tagStart;
28,022✔
821

822
    }        
823

824
    len += snprintf(buffer + len, TSDB_MAX_ALLOWED_SQL_LEN - len, fmt,
28,025✔
825
                    database->dbName, tableName,
826
                    database->dbName, stbInfo->stbName,
827
                    tagsForSQL, ttl);
828
    debugPrint("create table: <%s> <%s>\n", buffer, tableName);
28,025✔
829
    return len;
28,022✔
830
}
831

832
static int getBatchOfTblCreating(threadInfo *pThreadInfo,
28,036✔
833
                                         SSuperTable *stbInfo) {
834
    BArray *batchArray = stbInfo->batchTblCreatingNumbersArray;
28,036✔
835
    if (batchArray) {
28,036!
836
        int *batch = benchArrayGet(
×
837
                batchArray, pThreadInfo->posOfTblCreatingBatch);
×
838
        pThreadInfo->posOfTblCreatingBatch++;
×
839
        if (pThreadInfo->posOfTblCreatingBatch == batchArray->size) {
×
840
            pThreadInfo->posOfTblCreatingBatch = 0;
×
841
        }
842
        return *batch;
×
843
    }
844
    return 0;
28,036✔
845
}
846

847
static int getIntervalOfTblCreating(threadInfo *pThreadInfo,
23,348✔
848
                                         SSuperTable *stbInfo) {
849
    BArray *intervalArray = stbInfo->batchTblCreatingIntervalsArray;
23,348✔
850
    if (intervalArray) {
23,348!
851
        int *interval = benchArrayGet(
×
852
                intervalArray, pThreadInfo->posOfTblCreatingInterval);
×
853
        pThreadInfo->posOfTblCreatingInterval++;
×
854
        if (pThreadInfo->posOfTblCreatingInterval == intervalArray->size) {
×
855
            pThreadInfo->posOfTblCreatingInterval = 0;
×
856
        }
857
        return *interval;
×
858
    }
859
    return 0;
23,348✔
860
}
861

862
// table create thread
863
static void *createTable(void *sarg) {
892✔
864
    if (g_arguments->supplementInsert) {
892✔
865
        return NULL;
1✔
866
    }
867

868
    threadInfo  *pThreadInfo    = (threadInfo *)sarg;
891✔
869
    SDataBase   *database       = pThreadInfo->dbInfo;
891✔
870
    SSuperTable *stbInfo        = pThreadInfo->stbInfo;
891✔
871
    uint64_t    lastTotalCreate = 0;
891✔
872
    uint64_t    lastPrintTime   = toolsGetTimestampMs();
891✔
873
    int32_t     len             = 0;
891✔
874
    int32_t     batchNum        = 0;
891✔
875
    char ttl[SMALL_BUFF_LEN]    = "";
891✔
876

877
#ifdef LINUX
878
    prctl(PR_SET_NAME, "createTable");
891✔
879
#endif
880
    pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
891✔
881
    infoPrint(
891✔
882
              "thread[%d] start creating table from %" PRIu64 " to %" PRIu64
883
              "\n",
884
              pThreadInfo->threadID, pThreadInfo->start_table_from,
885
              pThreadInfo->end_table_to);
886
    if (stbInfo->ttl != 0) {
891✔
887
        snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
16✔
888
    }
889

890
    // tag read from csv
891
    FILE *csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
891✔
892
    // malloc
893
    char* tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
891✔
894
    int         w = 0; // record tagData
891✔
895

896
    int smallBatchCount = 0;
891✔
897
    int index=  pThreadInfo->start_table_from;
891✔
898
    int tableSum = pThreadInfo->end_table_to - pThreadInfo->start_table_from + 1;
891✔
899
    if (stbInfo->useTagTableName) {
891✔
900
        pThreadInfo->childNames = benchCalloc(tableSum, sizeof(char *), false);
3✔
901
        pThreadInfo->childTblCount = tableSum;
3✔
902
    }
903

904
    for (uint64_t i = pThreadInfo->start_table_from, j = 0;
891✔
905
                  i <= pThreadInfo->end_table_to && !g_arguments->terminate;
28,939✔
906
                  i++, j++) {
28,048✔
907
        if (g_arguments->terminate) {
28,049!
908
            goto create_table_end;
×
909
        }
910
        if (!stbInfo->use_metric || stbInfo->tags->size == 0) {
28,049!
911
            if (stbInfo->childTblCount == 1) {
17✔
912
                snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
4✔
913
                         g_arguments->escape_character
4✔
914
                         ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` %s;"
915
                         : "CREATE TABLE IF NOT EXISTS %s.%s %s;",
916
                         database->dbName, stbInfo->stbName,
917
                         stbInfo->colsOfCreateChildTable);
918
            } else {
919
                snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
13✔
920
                         g_arguments->escape_character
13✔
921
                         ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` %s;"
922
                         : "CREATE TABLE IF NOT EXISTS %s.%s %s;",
923
                         database->dbName,
924
                         stbInfo->childTblArray[i]->name,
13✔
925
                         stbInfo->colsOfCreateChildTable);
926
            }
927
            batchNum++;
17✔
928
        } else {
929
            if (0 == len) {
28,032✔
930
                batchNum = 0;
23,715✔
931
            }
932
            // generator
933
            if (w == 0) {
28,032✔
934
                if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
1,076!
935
                    goto create_table_end;
×
936
                }
937
            }
938
            char tbName[TSDB_TABLE_NAME_LEN] = {0};
28,032✔
939
            len = generateChildTblName(len, pThreadInfo->buffer,
28,032✔
940
                                       database, stbInfo, i, tagData, w, ttl, tbName);
941
            if (stbInfo->useTagTableName) {                     
28,027✔
942
                pThreadInfo->childNames[j] = strdup(tbName);
3✔
943
            }
944
            // move next
945
            if (++w >= TAG_BATCH_COUNT) {
28,027✔
946
                // reset for gen again
947
                w = 0;
230✔
948
                index += TAG_BATCH_COUNT;
230✔
949
            }                           
950

951
            batchNum++;
28,027✔
952
            smallBatchCount++;
28,027✔
953

954
            int smallBatch = getBatchOfTblCreating(pThreadInfo, stbInfo);
28,027✔
955
            if ((!smallBatch || (smallBatchCount == smallBatch))
28,037!
956
                    && (batchNum < stbInfo->batchTblCreatingNum)
28,037✔
957
                    && ((TSDB_MAX_ALLOWED_SQL_LEN - len) >=
4,710✔
958
                        (stbInfo->lenOfTags + EXTRA_SQL_LEN))) {
4,710!
959
                continue;
4,710✔
960
            } else {
961
                smallBatchCount = 0;
23,327✔
962
            }
963
        }
964

965
        len = 0;
23,344✔
966

967
        int ret = 0;
23,344✔
968
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
23,344✔
969
                   pThreadInfo->buffer);
970
        // REST
971
        if (REST_IFACE == stbInfo->iface) {
23,338!
972
            ret = queryDbExecRest(pThreadInfo->buffer,
×
973
                                  database->dbName,
974
                                  database->precision,
975
                                  stbInfo->iface,
×
976
                                  stbInfo->lineProtocol,
×
977
                                  stbInfo->tcpTransfer,
×
978
                                  pThreadInfo->sockfd);
979
        } else {
980
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
23,338✔
981
            int32_t trying = g_arguments->keep_trying;
23,344✔
982
            while (ret && trying) {
23,344!
983
                infoPrint("will sleep %"PRIu32" milliseconds then re-create "
×
984
                          "table %s\n",
985
                          g_arguments->trying_interval, pThreadInfo->buffer);
986
                toolsMsleep(g_arguments->trying_interval);
×
987
                ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
988
                if (trying != -1) {
×
989
                    trying--;
×
990
                }
991
            }
992
        }
993

994
        if (0 != ret) {
23,349!
995
            g_fail = true;
×
996
            goto create_table_end;
×
997
        }
998
        uint64_t intervalOfTblCreating = getIntervalOfTblCreating(pThreadInfo,
23,349✔
999
                                                                  stbInfo);
1000
        if (intervalOfTblCreating) {
23,348!
1001
            debugPrint("will sleep %"PRIu64" milliseconds "
×
1002
                       "for table creating interval\n", intervalOfTblCreating);
1003
            toolsMsleep(intervalOfTblCreating);
×
1004
        }
1005

1006
        pThreadInfo->tables_created += batchNum;
23,348✔
1007
        batchNum = 0;
23,348✔
1008
        uint64_t currentPrintTime = toolsGetTimestampMs();
23,348✔
1009
        if (currentPrintTime - lastPrintTime > PRINT_STAT_INTERVAL) {
23,340!
1010
            float speed = (pThreadInfo->tables_created - lastTotalCreate) * 1000 / (currentPrintTime - lastPrintTime);
×
1011
            infoPrint("thread[%d] already created %" PRId64 " tables, peroid speed: %.0f tables/s\n",
×
1012
                       pThreadInfo->threadID, pThreadInfo->tables_created, speed);
1013
            lastPrintTime   = currentPrintTime;
×
1014
            lastTotalCreate = pThreadInfo->tables_created;
×
1015
        }
1016
    }
1017

1018
    if (0 != len) {
890✔
1019
        int ret = 0;
389✔
1020
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
389!
1021
                   pThreadInfo->buffer);
1022
        // REST
1023
        if (REST_IFACE == stbInfo->iface) {
389✔
1024
            ret = queryDbExecRest(pThreadInfo->buffer,
8✔
1025
                                  database->dbName,
1026
                                  database->precision,
1027
                                  stbInfo->iface,
8✔
1028
                                  stbInfo->lineProtocol,
8✔
1029
                                  stbInfo->tcpTransfer,
8✔
1030
                                  pThreadInfo->sockfd);
1031
        } else {
1032
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
381✔
1033
        }
1034
        if (0 != ret) {
389!
1035
            g_fail = true;
×
1036
            goto create_table_end;
×
1037
        }
1038
        pThreadInfo->tables_created += batchNum;
389✔
1039
        debugPrint("thread[%d] already created %" PRId64 " tables\n",
389!
1040
                   pThreadInfo->threadID, pThreadInfo->tables_created);
1041
    }
1042
create_table_end:
890✔
1043
    // free
1044
    tmfree(tagData);
890✔
1045
    tmfree(pThreadInfo->buffer);
891✔
1046
    pThreadInfo->buffer = NULL;
891✔
1047
    if(csvFile) {
891✔
1048
        fclose(csvFile);
14✔
1049
    }
1050
    return NULL;
891✔
1051
}
1052

1053
static int startMultiThreadCreateChildTable(SDataBase* database, SSuperTable* stbInfo) {
202✔
1054
    int32_t code    = -1;
202✔
1055
    int32_t threads = g_arguments->table_threads;
202✔
1056
    int64_t ntables;
1057

1058
    if (stbInfo->childTblTo > 0) {
202✔
1059
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
1✔
1060
    } else if(stbInfo->childTblFrom > 0) {
201!
1061
        ntables = stbInfo->childTblCount - stbInfo->childTblFrom;
×
1062
    } else {
1063
        ntables = stbInfo->childTblCount;
201✔
1064
    }
1065
    pthread_t   *pids = benchCalloc(1, threads * sizeof(pthread_t), false);
202✔
1066
    threadInfo  *infos = benchCalloc(1, threads * sizeof(threadInfo), false);
202✔
1067
    uint64_t     tableFrom = stbInfo->childTblFrom;
202✔
1068
    if (threads < 1) {
202!
1069
        threads = 1;
×
1070
    }
1071
    if (ntables == 0) {
202✔
1072
        code = 0;
9✔
1073
        infoPrint("child table is zero, no need create. childTblCount: %"PRId64"\n", ntables);
9✔
1074
        goto over;
9✔
1075
    }
1076

1077
    int64_t div = ntables / threads;
193✔
1078
    if (div < 1) {
193✔
1079
        threads = (int)ntables;
78✔
1080
        div = 1;
78✔
1081
    }
1082
    int64_t mod = ntables % threads;
193✔
1083

1084
    int threadCnt = 0;
193✔
1085
    for (uint32_t i = 0; (i < threads && !g_arguments->terminate); i++) {
1,085!
1086
        threadInfo *pThreadInfo = infos + i;
892✔
1087
        pThreadInfo->threadID = i;
892✔
1088
        pThreadInfo->stbInfo = stbInfo;
892✔
1089
        pThreadInfo->dbInfo = database;
892✔
1090
        if (REST_IFACE == stbInfo->iface) {
892✔
1091
            int sockfd = createSockFd();
8✔
1092
            if (sockfd < 0) {
8!
1093
                FREE_PIDS_INFOS_RETURN_MINUS_1();
×
1094
            }
1095
            pThreadInfo->sockfd = sockfd;
8✔
1096
        } else {
1097
            pThreadInfo->conn = initBenchConn(pThreadInfo->dbInfo->dbName);
884✔
1098
            if (NULL == pThreadInfo->conn) {
884!
1099
                goto over;
×
1100
            }
1101
        }
1102
        pThreadInfo->start_table_from = tableFrom;
892✔
1103
        pThreadInfo->ntables          = i < mod ? div + 1 : div;
892✔
1104
        pThreadInfo->end_table_to     = i < mod ? tableFrom + div : tableFrom + div - 1;
892✔
1105
        tableFrom = pThreadInfo->end_table_to + 1;
892✔
1106
        pThreadInfo->tables_created = 0;
892✔
1107
        debugPrint("div table by thread. i=%d from=%"PRId64" to=%"PRId64" ntable=%"PRId64"\n", i, pThreadInfo->start_table_from,
892✔
1108
                                        pThreadInfo->end_table_to, pThreadInfo->ntables);
1109
        pthread_create(pids + i, NULL, createTable, pThreadInfo);
892✔
1110
        threadCnt ++;
892✔
1111
    }
1112

1113
    for (int i = 0; i < threadCnt; i++) {
1,085✔
1114
        pthread_join(pids[i], NULL);
892✔
1115
    }
1116

1117
    if (g_arguments->terminate)  toolsMsleep(100);
193!
1118

1119
    int nCount = 0;
193✔
1120
    for (int i = 0; i < threadCnt; i++) {
1,085✔
1121
        threadInfo *pThreadInfo = infos + i;
892✔
1122
        g_arguments->actualChildTables += pThreadInfo->tables_created;
892✔
1123

1124
        if ((REST_IFACE != stbInfo->iface) && pThreadInfo->conn) {
892!
1125
            closeBenchConn(pThreadInfo->conn);
884✔
1126
        }
1127
        
1128
        if (stbInfo->useTagTableName) {
892✔
1129
            for (int j = 0; j < pThreadInfo->childTblCount; j++) {
6✔
1130
                stbInfo->childTblArray[nCount++]->name = pThreadInfo->childNames[j];
3✔
1131
            }
1132
            tmfree(pThreadInfo->childNames);
3✔
1133
        }
1134
    }
1135

1136
    if (g_fail) {
193!
1137
        goto over;
×
1138
    }
1139
    code = 0;
193✔
1140
over:
202✔
1141
    free(pids);
202✔
1142
    free(infos);
202✔
1143
    return code;
202✔
1144
}
1145

1146
static int createChildTables() {
200✔
1147
    int32_t    code;
1148
    infoPrint("start creating %" PRId64 " table(s) with %d thread(s)\n",
200✔
1149
              g_arguments->totalChildTables, g_arguments->table_threads);
1150
    if (g_arguments->fpOfInsertResult) {
200!
1151
        infoPrintToFile(
200✔
1152
                  "start creating %" PRId64 " table(s) with %d thread(s)\n",
1153
                  g_arguments->totalChildTables, g_arguments->table_threads);
1154
    }
1155
    int64_t start = (double)toolsGetTimestampMs();
200✔
1156

1157
    for (int i = 0; (i < g_arguments->databases->size
200✔
1158
            && !g_arguments->terminate); i++) {
401!
1159
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
201✔
1160
        if (database->superTbls) {
201!
1161
            for (int j = 0; (j < database->superTbls->size
201✔
1162
                    && !g_arguments->terminate); j++) {
474!
1163
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
273✔
1164
                if (stbInfo->autoTblCreating || stbInfo->iface == SML_IFACE
273✔
1165
                    || stbInfo->iface == SML_REST_IFACE) {
221✔
1166
                    g_arguments->autoCreatedChildTables +=
60✔
1167
                            stbInfo->childTblCount;
60✔
1168
                    continue;
60✔
1169
                }
1170
                if (stbInfo->childTblExists) {
213✔
1171
                    g_arguments->existedChildTables +=
11✔
1172
                            stbInfo->childTblCount;
11✔
1173
                    continue;
11✔
1174
                }
1175
                debugPrint("colsOfCreateChildTable: %s\n",
202✔
1176
                        stbInfo->colsOfCreateChildTable);
1177

1178
                code = startMultiThreadCreateChildTable(database, stbInfo);
202✔
1179
                if (code && !g_arguments->terminate) {
202!
1180
                    return code;
×
1181
                }
1182
            }
1183
        }
1184
    }
1185

1186
    int64_t end = toolsGetTimestampMs();
200✔
1187
    if(end == start) {
200✔
1188
        end += 1;
31✔
1189
    }
1190
    succPrint(
200!
1191
            "Spent %.4f seconds to create %" PRId64
1192
            " table(s) with %d thread(s) speed: %.0f tables/s, already exist %" PRId64
1193
            " table(s), actual %" PRId64 " table(s) pre created, %" PRId64
1194
            " table(s) will be auto created\n",
1195
            (float)(end - start) / 1000.0,
1196
            g_arguments->totalChildTables,
1197
            g_arguments->table_threads,
1198
            g_arguments->actualChildTables * 1000 / (float)(end - start),
1199
            g_arguments->existedChildTables,
1200
            g_arguments->actualChildTables,
1201
            g_arguments->autoCreatedChildTables);
1202
    return 0;
200✔
1203
}
1204

1205
static void freeChildTable(SChildTable *childTbl, int colsSize) {
49,165✔
1206
    if (childTbl->useOwnSample) {
49,165✔
1207
        if (childTbl->childCols) {
12✔
1208
            for (int col = 0; col < colsSize; col++) {
28✔
1209
                ChildField *childCol =
1210
                    benchArrayGet(childTbl->childCols, col);
20✔
1211
                if (childCol) {
20!
1212
                    tmfree(childCol->stmtData.data);
20✔
1213
                    childCol->stmtData.data = NULL;
20✔
1214
                    tmfree(childCol->stmtData.is_null);
20✔
1215
                    childCol->stmtData.is_null = NULL;
20✔
1216
                    tmfree(childCol->stmtData.lengths);
20✔
1217
                    childCol->stmtData.lengths = NULL;
20✔
1218
                }
1219
            }
1220
            benchArrayDestroy(childTbl->childCols);
8✔
1221
        }
1222
        tmfree(childTbl->sampleDataBuf);
12✔
1223
    }
1224
    tmfree(childTbl);
49,165✔
1225
}
49,165✔
1226

1227
void postFreeResource() {
241✔
1228
    infoPrint("%s\n", "free resource and exit ...");
241✔
1229
    if (!g_arguments->terminate) {
241!
1230
        tmfclose(g_arguments->fpOfInsertResult);
241✔
1231
    }
1232

1233
    for (int i = 0; i < g_arguments->databases->size; i++) {
483✔
1234
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
242✔
1235
        if (database->cfgs) {
242!
1236
            for (int c = 0; c < database->cfgs->size; c++) {
407✔
1237
                SDbCfg *cfg = benchArrayGet(database->cfgs, c);
165✔
1238
                if (cfg->valuestring && cfg->free) {
165!
1239
                    tmfree(cfg->valuestring);
×
1240
                    cfg->valuestring = NULL;
×
1241
                }
1242
            }
1243
            benchArrayDestroy(database->cfgs);
242✔
1244
        }
1245
        if (database->superTbls) {
242!
1246
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
559✔
1247
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
317✔
1248
                tmfree(stbInfo->colsOfCreateChildTable);
317✔
1249
                stbInfo->colsOfCreateChildTable = NULL;
317✔
1250
                tmfree(stbInfo->sampleDataBuf);
317✔
1251
                stbInfo->sampleDataBuf = NULL;
317✔
1252
                tmfree(stbInfo->partialColNameBuf);
317✔
1253
                stbInfo->partialColNameBuf = NULL;
317✔
1254
                benchArrayDestroy(stbInfo->batchTblCreatingNumbersArray);
317✔
1255
                benchArrayDestroy(stbInfo->batchTblCreatingIntervalsArray);
317✔
1256
                for (int k = 0; k < stbInfo->tags->size; k++) {
1,943✔
1257
                    Field * tag = benchArrayGet(stbInfo->tags, k);
1,626✔
1258
                    tmfree(tag->stmtData.data);
1,626✔
1259
                    tag->stmtData.data = NULL;
1,626✔
1260
                    tmfree(tag->stmtData.is_null);
1,626✔
1261
                    tag->stmtData.is_null = NULL;
1,626✔
1262
                    tmfree(tag->stmtData.lengths);
1,626✔
1263
                    tag->stmtData.lengths = NULL;
1,626✔
1264
                }
1265
                benchArrayDestroy(stbInfo->tags);
317✔
1266

1267
                for (int k = 0; k < stbInfo->cols->size; k++) {
29,015✔
1268
                    Field * col = benchArrayGet(stbInfo->cols, k);
28,698✔
1269
                    tmfree(col->stmtData.data);
28,698✔
1270
                    col->stmtData.data = NULL;
28,698✔
1271
                    tmfree(col->stmtData.is_null);
28,698✔
1272
                    col->stmtData.is_null = NULL;
28,698✔
1273
                    tmfree(col->stmtData.lengths);
28,698✔
1274
                    col->stmtData.lengths = NULL;
28,698✔
1275
                }
1276
                if (g_arguments->test_mode == INSERT_TEST) {
317✔
1277
                    if (stbInfo->childTblArray) {
282✔
1278
                        for (int64_t child = 0; child < stbInfo->childTblCount;
49,438✔
1279
                                child++) {
49,165✔
1280
                            SChildTable *childTbl = stbInfo->childTblArray[child];
49,165✔
1281
                            if (childTbl) {
49,165!
1282
                                tmfree(childTbl->name);
49,165✔
1283
                                freeChildTable(childTbl, stbInfo->cols->size);
49,165✔
1284
                            }
1285
                        }
1286
                    }
1287
                }
1288
                benchArrayDestroy(stbInfo->cols);
317✔
1289
                tmfree(stbInfo->childTblArray);
317✔
1290
                stbInfo->childTblArray = NULL;
317✔
1291
                benchArrayDestroy(stbInfo->tsmas);
317✔
1292

1293
                // free sqls
1294
                if(stbInfo->sqls) {
317✔
1295
                    char **sqls = stbInfo->sqls;
16✔
1296
                    while (*sqls) {
80✔
1297
                        free(*sqls);
64✔
1298
                        sqls++;
64✔
1299
                    }
1300
                    tmfree(stbInfo->sqls);
16✔
1301
                }
1302

1303
                // thread_bind
1304
                if (database->vgArray) {
317✔
1305
                    for (int32_t v = 0; v < database->vgroups; v++) {
22✔
1306
                        SVGroup *vg = benchArrayGet(database->vgArray, v);
20✔
1307
                        tmfree(vg->childTblArray);
20✔
1308
                        vg->childTblArray = NULL;
20✔
1309
                    }
1310
                    benchArrayDestroy(database->vgArray);
2✔
1311
                    database->vgArray = NULL;
2✔
1312
                }
1313
            }
1314
            benchArrayDestroy(database->superTbls);
242✔
1315
        }
1316
    }
1317
    benchArrayDestroy(g_arguments->databases);
241✔
1318
    benchArrayDestroy(g_arguments->streams);
241✔
1319
    tools_cJSON_Delete(root);
241✔
1320
}
241✔
1321

1322
int32_t execInsert(threadInfo *pThreadInfo, uint32_t k, int64_t *delay3) {
114,158✔
1323
    SDataBase *  database = pThreadInfo->dbInfo;
114,158✔
1324
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
114,158✔
1325
    TAOS_RES *   res = NULL;
114,158✔
1326
    int32_t      code = 0;
114,158✔
1327
    uint16_t     iface = stbInfo->iface;
114,158✔
1328
    int64_t      start = 0;
114,158✔
1329
    int32_t      affectRows = 0;
114,158✔
1330

1331
    int32_t trying = (stbInfo->keep_trying)?
228,316✔
1332
        stbInfo->keep_trying:g_arguments->keep_trying;
114,158!
1333
    int32_t trying_interval = stbInfo->trying_interval?
228,316✔
1334
        stbInfo->trying_interval:g_arguments->trying_interval;
114,158!
1335
    int protocol = stbInfo->lineProtocol;
114,158✔
1336

1337
    switch (iface) {
114,158!
1338
        case TAOSC_IFACE:
78,266✔
1339
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
78,266!
1340
            code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
78,266✔
1341
            while (code && trying && !g_arguments->terminate) {
78,258!
1342
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1343
                          trying_interval);
1344
                toolsMsleep(trying_interval);
×
1345
                code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
1346
                if (trying != -1) {
×
1347
                    trying--;
×
1348
                }
1349
            }
1350
            break;
78,258✔
1351
        // REST
1352
        case REST_IFACE:
16✔
1353
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
16!
1354
            code = postProcessSql(pThreadInfo->buffer,
16✔
1355
                                database->dbName,
1356
                                database->precision,
1357
                                stbInfo->iface,
16✔
1358
                                stbInfo->lineProtocol,
16✔
1359
                                g_arguments->port,
16✔
1360
                                stbInfo->tcpTransfer,
16✔
1361
                                pThreadInfo->sockfd,
1362
                                pThreadInfo->filePath);
16✔
1363
            while (code && trying && !g_arguments->terminate) {
16!
1364
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1365
                          trying_interval);
1366
                toolsMsleep(trying_interval);
×
1367
                code = postProcessSql(pThreadInfo->buffer,
×
1368
                                    database->dbName,
1369
                                    database->precision,
1370
                                    stbInfo->iface,
×
1371
                                    stbInfo->lineProtocol,
×
1372
                                    g_arguments->port,
×
1373
                                    stbInfo->tcpTransfer,
×
1374
                                    pThreadInfo->sockfd,
1375
                                    pThreadInfo->filePath);
×
1376
                if (trying != -1) {
×
1377
                    trying--;
×
1378
                }
1379
            }
1380
            break;
16✔
1381
            
1382
        case STMT_IFACE:
22,560✔
1383
            // add batch
1384
            if(!stbInfo->autoTblCreating) {
22,560✔
1385
                start = toolsGetTimestampUs();
12,545✔
1386
                if (taos_stmt_add_batch(pThreadInfo->conn->stmt) != 0) {
12,546!
1387
                    errorPrint("taos_stmt_add_batch() failed! reason: %s\n",
×
1388
                            taos_stmt_errstr(pThreadInfo->conn->stmt));
1389
                    return -1;
×
1390
                }
1391
                if(delay3) {
12,543✔
1392
                    *delay3 += toolsGetTimestampUs() - start;
12,542✔
1393
                }
1394
            }
1395
            
1396
            // execute 
1397
            code = taos_stmt_execute(pThreadInfo->conn->stmt);
22,560✔
1398
            if (code) {
22,565✔
1399
                errorPrint(
1!
1400
                           "failed to execute insert statement. reason: %s\n",
1401
                           taos_stmt_errstr(pThreadInfo->conn->stmt));
1402
                code = -1;
×
1403
            }
1404
            break;
22,564✔
1405

1406
        case STMT2_IFACE:
12,456✔
1407
            // execute 
1408
            code = taos_stmt2_exec(pThreadInfo->conn->stmt2, &affectRows);
12,456✔
1409
            if (code) {
12,447!
1410
                errorPrint( "failed to call taos_stmt2_exec(). reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
×
1411
                code = -1;
×
1412
            }
1413
            debugPrint( "succ call taos_stmt2_exec() affectRows:%d\n", affectRows);
12,447✔
1414
            break;
12,450✔
1415

1416
        case SML_IFACE:
832✔
1417
            res = taos_schemaless_insert(
1,519✔
1418
                pThreadInfo->conn->taos, pThreadInfo->lines,
832✔
1419
                (TSDB_SML_JSON_PROTOCOL == protocol
1420
                    || SML_JSON_TAOS_FORMAT == protocol)
687✔
1421
                    ? 0 : k,
1422
                (SML_JSON_TAOS_FORMAT == protocol)
1423
                    ? TSDB_SML_JSON_PROTOCOL : protocol,
1424
                (TSDB_SML_LINE_PROTOCOL == protocol)
1425
                    ? database->sml_precision
1426
                    : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1427
            code = taos_errno(res);
832✔
1428
            trying = stbInfo->keep_trying;
832✔
1429
            while (code && trying && !g_arguments->terminate) {
832!
1430
                taos_free_result(res);
×
1431
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1432
                          trying_interval);
1433
                toolsMsleep(trying_interval);
×
1434
                res = taos_schemaless_insert(
×
1435
                        pThreadInfo->conn->taos, pThreadInfo->lines,
×
1436
                        (TSDB_SML_JSON_PROTOCOL == protocol
1437
                            || SML_JSON_TAOS_FORMAT == protocol)
×
1438
                            ? 0 : k,
1439
                        (SML_JSON_TAOS_FORMAT == protocol)
1440
                            ? TSDB_SML_JSON_PROTOCOL : protocol,
1441
                        (TSDB_SML_LINE_PROTOCOL == protocol)
1442
                            ? database->sml_precision
1443
                            : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1444
                code = taos_errno(res);
×
1445
                if (trying != -1) {
×
1446
                    trying--;
×
1447
                }
1448
            }
1449

1450
            if (code != TSDB_CODE_SUCCESS && !g_arguments->terminate) {
832!
1451
                debugPrint("Failed to execute "
×
1452
                           "schemaless insert content: %s\n\n",
1453
                        pThreadInfo->lines?(pThreadInfo->lines[0]?
1454
                            pThreadInfo->lines[0]:""):"");
1455
                errorPrint(
×
1456
                    "failed to execute schemaless insert. "
1457
                        "code: 0x%08x reason: %s\n\n",
1458
                        code, taos_errstr(res));
1459
            }
1460
            taos_free_result(res);
832✔
1461
            break;
832✔
1462
        case SML_REST_IFACE: {
37✔
1463
            if (TSDB_SML_JSON_PROTOCOL == protocol
37✔
1464
                    || SML_JSON_TAOS_FORMAT == protocol) {
36✔
1465
                code = postProcessSql(pThreadInfo->lines[0], database->dbName,
2✔
1466
                                    database->precision, stbInfo->iface,
2✔
1467
                                    protocol, g_arguments->port,
2✔
1468
                                    stbInfo->tcpTransfer,
2✔
1469
                                    pThreadInfo->sockfd, pThreadInfo->filePath);
2✔
1470
            } else {
1471
                int len = 0;
35✔
1472
                for (int i = 0; i < k; i++) {
357✔
1473
                    if (strlen(pThreadInfo->lines[i]) != 0) {
322!
1474
                        int n;
1475
                        if (TSDB_SML_TELNET_PROTOCOL == protocol
322✔
1476
                                && stbInfo->tcpTransfer) {
320!
1477
                            n = snprintf(pThreadInfo->buffer + len,
320✔
1478
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
320✔
1479
                                           "put %s\n", pThreadInfo->lines[i]);
320✔
1480
                        } else {
1481
                            n = snprintf(pThreadInfo->buffer + len,
2✔
1482
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
2✔
1483
                                            "%s\n",
1484
                                           pThreadInfo->lines[i]);
2✔
1485
                        }
1486
                        if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - len) {
322!
1487
                            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
1488
                                __func__, __LINE__, i);
1489
                            break;
×
1490
                        } else {
1491
                            len += n;
322✔
1492
                        }
1493
                    } else {
1494
                        break;
×
1495
                    }
1496
                }
1497
                if (g_arguments->terminate) {
35!
1498
                    break;
×
1499
                }
1500
                code = postProcessSql(pThreadInfo->buffer, database->dbName,
35✔
1501
                        database->precision,
1502
                        stbInfo->iface, protocol,
35✔
1503
                        g_arguments->port,
35✔
1504
                        stbInfo->tcpTransfer,
35✔
1505
                        pThreadInfo->sockfd, pThreadInfo->filePath);
35✔
1506
            }
1507
            break;
37✔
1508
        }
1509
    }
1510
    return code;
114,148✔
1511
}
1512

1513
static int smartContinueIfFail(threadInfo *pThreadInfo,
×
1514
                               SChildTable *childTbl,
1515
                               char *tagData,
1516
                               int64_t i,
1517
                               char *ttl) {
1518
    SDataBase *  database = pThreadInfo->dbInfo;
×
1519
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
1520
    char *buffer =
1521
        benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
×
1522
    snprintf(
×
1523
            buffer, TSDB_MAX_ALLOWED_SQL_LEN,
1524
            g_arguments->escape_character ?
×
1525
                "CREATE TABLE IF NOT EXISTS `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s "
1526
                : "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (%s) %s ",
1527
            database->dbName, childTbl->name, database->dbName,
1528
            stbInfo->stbName,
1529
            tagData + i * stbInfo->lenOfTags, ttl);
×
1530
    debugPrint("creating table: %s\n", buffer);
×
1531

1532
    int ret;
1533
    if (REST_IFACE == stbInfo->iface) {
×
1534
        ret = queryDbExecRest(buffer,
×
1535
                              database->dbName,
1536
                              database->precision,
1537
                              stbInfo->iface,
×
1538
                              stbInfo->lineProtocol,
×
1539
                              stbInfo->tcpTransfer,
×
1540
                              pThreadInfo->sockfd);
1541
    } else {
1542
        ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
1543
        int32_t trying = g_arguments->keep_trying;
×
1544
        while (ret && trying) {
×
1545
            infoPrint("will sleep %"PRIu32" milliseconds then "
×
1546
                      "re-create table %s\n",
1547
                      g_arguments->trying_interval, buffer);
1548
            toolsMsleep(g_arguments->trying_interval);
×
1549
            ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
1550
            if (trying != -1) {
×
1551
                trying--;
×
1552
            }
1553
        }
1554
    }
1555

1556
    tmfree(buffer);
×
1557

1558
    return ret;
×
1559
}
1560

1561
static void cleanupAndPrint(threadInfo *pThreadInfo, char *mode) {
1,208✔
1562
    if (pThreadInfo) {
1,208!
1563
        if (pThreadInfo->json_array) {
1,208✔
1564
            tools_cJSON_Delete(pThreadInfo->json_array);
75✔
1565
            pThreadInfo->json_array = NULL;
75✔
1566
        }
1567
        if (0 == pThreadInfo->totalDelay) {
1,208✔
1568
            pThreadInfo->totalDelay = 1;
4✔
1569
        }
1570
        succPrint(
1,208!
1571
            "thread[%d] %s mode, completed total inserted rows: %" PRIu64
1572
            ", %.2f records/second\n",
1573
            pThreadInfo->threadID,
1574
            mode,
1575
            pThreadInfo->totalInsertRows,
1576
            (double)(pThreadInfo->totalInsertRows /
1577
            ((double)pThreadInfo->totalDelay / 1E6)));
1578
    }
1579
}
1,208✔
1580

1581
static int64_t getDisorderTs(SSuperTable *stbInfo, int *disorderRange) {
39,443,618✔
1582
    int64_t disorderTs = 0;
39,443,618✔
1583
    int64_t startTimestamp = stbInfo->startTimestamp;
39,443,618✔
1584
    if (stbInfo->disorderRatio > 0) {
39,443,618✔
1585
        int rand_num = taosRandom() % 100;
5,885,969✔
1586
        if (rand_num < stbInfo->disorderRatio) {
5,958,692✔
1587
            (*disorderRange)--;
183,191✔
1588
            if (0 == *disorderRange) {
183,191!
1589
                *disorderRange = stbInfo->disorderRange;
×
1590
            }
1591
            disorderTs = startTimestamp - *disorderRange;
183,191✔
1592
            debugPrint("rand_num: %d, < disorderRatio: %d, "
183,191!
1593
                       "disorderTs: %"PRId64"\n",
1594
                       rand_num, stbInfo->disorderRatio,
1595
                       disorderTs);
1596
        }
1597
    }
1598
    return disorderTs;
39,486,281✔
1599
}
1600

1601
void loadChildTableInfo(threadInfo* pThreadInfo) {
1,248✔
1602
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
1,248✔
1603
    if(!g_arguments->pre_load_tb_meta) {
1,248!
1604
        return ;
1,248✔
1605
    }
1606
    if(pThreadInfo->conn == NULL) {
×
1607
        return ;
×
1608
    }
1609

1610
    char *db    = pThreadInfo->dbInfo->dbName;
×
1611
    int64_t cnt = pThreadInfo->end_table_to - pThreadInfo->start_table_from;
×
1612

1613
    // 100k
1614
    int   bufLen = 100 * 1024;
×
1615
    char *buf    = benchCalloc(1, bufLen, false);
×
1616
    int   pos    = 0;
×
1617
    infoPrint("start load child tables(%"PRId64") info...\n", cnt);
×
1618
    int64_t start = toolsGetTimestampUs();
×
1619
    for(int64_t i = pThreadInfo->start_table_from; i < pThreadInfo->end_table_to; i++) {
×
1620
        SChildTable *childTbl = stbInfo->childTblArray[i];
×
1621
        pos += sprintf(buf + pos, ",%s.%s", db, childTbl->name);
×
1622

1623
        if(pos >= bufLen - 256 || i + 1 == pThreadInfo->end_table_to) {
×
1624
            taos_load_table_info(pThreadInfo->conn, buf);
×
1625
            pos = 0;
×
1626
        }
1627
    }
1628
    int64_t delay = toolsGetTimestampUs() - start;
×
1629
    infoPrint("end load child tables info. delay=%.2fs\n", delay/1E6);
×
1630
    pThreadInfo->totalDelay += delay;
×
1631

1632
    tmfree(buf);
×
1633
}
1634

1635
// create conn again
1636
int32_t reCreateConn(threadInfo * pThreadInfo) {
×
1637
    // single
1638
    bool single = true;
×
1639
    if (pThreadInfo->dbInfo->superTbls->size > 1) {
×
1640
        single = false;
×
1641
    }
1642

1643
    //
1644
    // retry stmt2 init 
1645
    //
1646

1647
    // stmt2 close
1648
    if (pThreadInfo->conn->stmt2) {
×
1649
        taos_stmt2_close(pThreadInfo->conn->stmt2);
×
1650
        pThreadInfo->conn->stmt2 = NULL;
×
1651
    }
1652

1653
    // retry stmt2 init , maybe success
1654
    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
1655
    if (pThreadInfo->conn->stmt2) {
×
1656
        succPrint("%s", "reCreateConn first taos_stmt2_init() success and return.\n");
×
1657
        return 0;
×
1658
    }
1659

1660
    //
1661
    // close old
1662
    //
1663
    closeBenchConn(pThreadInfo->conn);
×
1664
    pThreadInfo->conn = NULL;
×
1665

1666
    //
1667
    // create new
1668
    //
1669

1670
    // conn
1671
    pThreadInfo->conn = initBenchConn(pThreadInfo->dbInfo->dbName);
×
1672
    if (pThreadInfo->conn == NULL) {
×
1673
        errorPrint("%s", "reCreateConn initBenchConn failed.");
×
1674
        return -1;
×
1675
    }
1676
    // stmt2
1677
    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
1678
    if (NULL == pThreadInfo->conn->stmt2) {
×
1679
        errorPrint("reCreateConn taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
×
1680
        return -1;
×
1681
    } 
1682
        
1683
    succPrint("%s", "reCreateConn second taos_stmt2_init() success.\n");
×
1684
    // select db 
1685
    if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
×
1686
        errorPrint("second taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
1687
        return -1;
×
1688
    }
1689

1690
    return 0;
×
1691
}
1692

1693
// reinit
1694
int32_t reConnectStmt2(threadInfo * pThreadInfo, int32_t w) {
×
1695
    // re-create connection
1696
    int32_t code = reCreateConn(pThreadInfo);
×
1697
    if (code != 0) {
×
1698
        return code;
×
1699
    }
1700

1701
    // prepare
1702
    code = prepareStmt2(pThreadInfo->conn->stmt2, pThreadInfo->stbInfo, NULL, w, pThreadInfo->dbInfo->dbName);
×
1703
    if (code != 0) {
×
1704
        return code;
×
1705
    }
1706

1707
    return code;
×
1708
}
1709

1710
int32_t submitStmt2Impl(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
12,451✔
1711
                    int64_t* startTs, int64_t* endTs, uint32_t* generated) {
1712
    // call bind
1713
    int64_t start = toolsGetTimestampUs();
12,451✔
1714
    int32_t code = taos_stmt2_bind_param(pThreadInfo->conn->stmt2, bindv, -1);
12,456✔
1715
    if (code != 0) {
12,459!
1716
        errorPrint("taos_stmt2_bind_param failed, reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
×
1717
        return code;
×
1718
    }
1719
    debugPrint("interlace taos_stmt2_bind_param() ok.  bindv->count=%d \n", bindv->count);
12,459✔
1720
    *delay1 += toolsGetTimestampUs() - start;
12,459✔
1721

1722
    // execute
1723
    *startTs = toolsGetTimestampUs();
12,462✔
1724
    code = execInsert(pThreadInfo, *generated, delay3);
12,462✔
1725
    *endTs = toolsGetTimestampUs();
12,448✔
1726
    return code;
12,448✔
1727
}
1728

1729
int32_t submitStmt2(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
12,453✔
1730
                    int64_t* startTs, int64_t* endTs, uint32_t* generated, int32_t w) {
1731
    // calc loop
1732
    int32_t loop = 1;
12,453✔
1733
    SSuperTable* stbInfo = pThreadInfo->stbInfo;
12,453✔
1734
    if(stbInfo->continueIfFail == YES_IF_FAILED) {          
12,453!
1735
        if(stbInfo->keep_trying > 1) {
×
1736
            loop = stbInfo->keep_trying;
×
1737
        } else {
1738
            loop = 3; // default
×
1739
        }
1740
    }
1741

1742
    // submit stmt2
1743
    int32_t i = 0;
12,453✔
1744
    bool connected = true;
12,453✔
1745
    while (1) {
×
1746
        int32_t code = -1;
12,453✔
1747
        if(connected) {
12,453!
1748
            // reinit success to do submit
1749
            code = submitStmt2Impl(pThreadInfo, bindv, delay1, delay3, startTs, endTs, generated);
12,458✔
1750
        }
1751

1752
        // check code
1753
        if ( code == 0) {
12,447!
1754
            // success
1755
            break;
12,447✔
1756
        } else {
1757
            // failed to try
1758
            if (--loop == 0) {
×
1759
                // failed finally
1760
                char tip[64] = "";
×
1761
                if (i > 0) {
×
1762
                    snprintf(tip, sizeof(tip), " after retry %d", i);
×
1763
                }
1764
                errorPrint("finally faild execute submitStmt2()%s\n", tip);
×
1765
                return -1;
×
1766
            }
1767

1768
            // wait a memont for trying
1769
            toolsMsleep(stbInfo->trying_interval);
×
1770
            // reinit
1771
            infoPrint("stmt2 start retry submit i=%d  after sleep %d ms...\n", i++, stbInfo->trying_interval);
×
1772
            code = reConnectStmt2(pThreadInfo, w);
×
1773
            if (code != 0) {
×
1774
                // faild and try again
1775
                errorPrint("faild reConnectStmt2 and retry again for next i=%d \n", i);
×
1776
                connected = false;
×
1777
            } else {
1778
                // succ 
1779
                connected = true;
×
1780
            }
1781
        }
1782
    }
1783
    
1784
    // success
1785
    return 0;
12,447✔
1786
}
1787

1788
static void *syncWriteInterlace(void *sarg) {
107✔
1789
    threadInfo * pThreadInfo = (threadInfo *)sarg;
107✔
1790
    SDataBase *  database = pThreadInfo->dbInfo;
107✔
1791
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
107✔
1792
    infoPrint(
107✔
1793
              "thread[%d] start interlace inserting into table from "
1794
              "%" PRIu64 " to %" PRIu64 "\n",
1795
              pThreadInfo->threadID, pThreadInfo->start_table_from,
1796
              pThreadInfo->end_table_to);
1797

1798
    int64_t insertRows = stbInfo->insertRows;
107✔
1799
    int32_t interlaceRows = stbInfo->interlaceRows;
107✔
1800
    uint32_t nBatchTable  = g_arguments->reqPerReq / interlaceRows;
107✔
1801
    uint64_t   lastPrintTime = toolsGetTimestampMs();
107✔
1802
    uint64_t   lastTotalInsertRows = 0;
107✔
1803
    int64_t   startTs = toolsGetTimestampUs();
107✔
1804
    int64_t   endTs;
1805
    uint64_t   tableSeq = pThreadInfo->start_table_from;
107✔
1806
    int disorderRange = stbInfo->disorderRange;
107✔
1807
    int32_t i = 0;
107✔
1808

1809
    loadChildTableInfo(pThreadInfo);
107✔
1810
    // check if filling back mode
1811
    bool fillBack = false;
107✔
1812
    if(stbInfo->useNow && stbInfo->startFillbackTime) {
107!
1813
        fillBack = true;
×
1814
        pThreadInfo->start_time = stbInfo->startFillbackTime;
×
1815
        infoPrint("start time change to startFillbackTime = %"PRId64" \n", pThreadInfo->start_time);
×
1816
    }
1817

1818
    FILE* csvFile = NULL;
107✔
1819
    char* tagData = NULL;
107✔
1820
    int   w       = 0; // record tags position, if w > TAG_BATCH_COUNT , need recreate new tag values
107✔
1821
    if (stbInfo->autoTblCreating) {
107✔
1822
        csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
5✔
1823
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
5✔
1824
    }
1825
    int64_t delay1 = 0;
107✔
1826
    int64_t delay2 = 0;
107✔
1827
    int64_t delay3 = 0;
107✔
1828
    bool    firstInsertTb = true;
107✔
1829

1830
    TAOS_STMT2_BINDV *bindv = NULL;
107✔
1831

1832
    // create bindv
1833
    if(stbInfo->iface == STMT2_IFACE) {
107✔
1834
        int32_t tagCnt = stbInfo->autoTblCreating ? stbInfo->tags->size : 0;
27!
1835
        if (csvFile) {
27!
1836
            tagCnt = 0;
×
1837
        }
1838
        //int32_t tagCnt = stbInfo->tags->size;
1839
        bindv = createBindV(nBatchTable,  tagCnt, stbInfo->cols->size + 1);
27✔
1840
    }
1841

1842
    bool oldInitStmt = stbInfo->autoTblCreating;
107✔
1843
    // not auto create table call once
1844
    if(stbInfo->iface == STMT_IFACE && !oldInitStmt) {
107!
1845
        debugPrint("call prepareStmt for stable:%s\n", stbInfo->stbName);
23!
1846
        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
23!
1847
            g_fail = true;
×
1848
            goto free_of_interlace;
×
1849
        }
1850
    }
1851
    else if (stbInfo->iface == STMT2_IFACE) {
84✔
1852
        // only prepare once
1853
        if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, NULL, w, database->dbName)) {
27!
1854
            g_fail = true;
×
1855
            goto free_of_interlace;
×
1856
        }
1857
    }    
1858
    int64_t index = tableSeq;
107✔
1859
    while (insertRows > 0) {
36,610✔
1860
        int64_t tmp_total_insert_rows = 0;
36,498✔
1861
        uint32_t generated = 0;
36,498✔
1862
        if (insertRows <= interlaceRows) {
36,498✔
1863
            interlaceRows = insertRows;
122✔
1864
        }
1865

1866
        // loop each table
1867
        for (i = 0; i < nBatchTable; i++) {
51,434✔
1868
            if (g_arguments->terminate) {
51,393!
1869
                goto free_of_interlace;
×
1870
            }
1871
            int64_t pos = pThreadInfo->pos;
51,393✔
1872
            
1873
            // get childTable
1874
            SChildTable *childTbl;
1875
            if (g_arguments->bind_vgroup) {
51,393!
1876
                childTbl = pThreadInfo->vg->childTblArray[tableSeq];
×
1877
            } else {
1878
                childTbl = stbInfo->childTblArray[tableSeq];
51,393✔
1879
            }
1880
            
1881
            char*  tableName   = childTbl->name;
51,393✔
1882
            char *sampleDataBuf = childTbl->useOwnSample?
102,786✔
1883
                                        childTbl->sampleDataBuf:
51,393!
1884
                                        stbInfo->sampleDataBuf;
1885
            // init ts
1886
            if(childTbl->ts == 0) {
51,393✔
1887
               childTbl->ts = pThreadInfo->start_time;
737✔
1888
            }
1889
            char ttl[SMALL_BUFF_LEN] = "";
51,393✔
1890
            if (stbInfo->ttl != 0) {
51,393✔
1891
                snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
32✔
1892
            }
1893
            switch (stbInfo->iface) {
51,393!
1894
                case REST_IFACE:
17,145✔
1895
                case TAOSC_IFACE: {
1896
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
17,145✔
1897
                    if (g_arguments->escape_character) {
17,145✔
1898
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "`%s`",
2,030✔
1899
                                tableName);
1900
                    } else {
1901
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "%s",
15,115✔
1902
                                tableName);
1903
                    }
1904
                    if (i == 0) {
17,145✔
1905
                        ds_add_str(&pThreadInfo->buffer, STR_INSERT_INTO);
12,188✔
1906
                    }
1907

1908
                    // generator
1909
                    if (stbInfo->autoTblCreating && w == 0) {
17,147✔
1910
                        if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
1!
1911
                            goto free_of_interlace;
×
1912
                        }
1913
                    }
1914

1915
                    // create child table
1916
                    if (stbInfo->partialColNum == stbInfo->cols->size) {
17,147✔
1917
                        if (stbInfo->autoTblCreating) {
17,115!
1918
                            ds_add_strs(&pThreadInfo->buffer, 8,
×
1919
                                    escapedTbName,
1920
                                    " USING `",
1921
                                    stbInfo->stbName,
1922
                                    "` TAGS (",
1923
                                    tagData + stbInfo->lenOfTags * w,
×
1924
                                    ") ", ttl, " VALUES ");
1925
                        } else {
1926
                            ds_add_strs(&pThreadInfo->buffer, 2,
17,115✔
1927
                                    escapedTbName, " VALUES ");
1928
                        }
1929
                    } else {
1930
                        if (stbInfo->autoTblCreating) {
32!
1931
                            ds_add_strs(&pThreadInfo->buffer, 10,
32✔
1932
                                        escapedTbName,
1933
                                        " (",
1934
                                        stbInfo->partialColNameBuf,
1935
                                        ") USING `",
1936
                                        stbInfo->stbName,
1937
                                        "` TAGS (",
1938
                                        tagData + stbInfo->lenOfTags * w,
32✔
1939
                                        ") ", ttl, " VALUES ");
1940
                        } else {
1941
                            ds_add_strs(&pThreadInfo->buffer, 4,
×
1942
                                        escapedTbName,
1943
                                        "(",
1944
                                        stbInfo->partialColNameBuf,
1945
                                        ") VALUES ");
1946
                        }
1947
                    }
1948

1949
                    // move next
1950
                    if (stbInfo->autoTblCreating && ++w >= TAG_BATCH_COUNT) {
17,139!
1951
                        // reset for gen again
1952
                        w = 0;
×
1953
                        index += TAG_BATCH_COUNT;
×
1954
                    }  
1955

1956
                    // write child data with interlaceRows
1957
                    for (int64_t j = 0; j < interlaceRows; j++) {
39,406✔
1958
                        int64_t disorderTs = getDisorderTs(stbInfo,
22,257✔
1959
                                &disorderRange);
1960

1961
                        // change fillBack mode with condition
1962
                        if(fillBack) {
22,257!
1963
                            int64_t tsnow = toolsGetTimestamp(database->precision);
×
1964
                            if(childTbl->ts >= tsnow){
×
1965
                                fillBack = false;
×
1966
                                infoPrint("fillBack mode set end. because timestamp(%"PRId64") >= now(%"PRId64")\n", childTbl->ts, tsnow);
×
1967
                            }
1968
                        }
1969

1970
                        // timestamp         
1971
                        char time_string[BIGINT_BUFF_LEN];
1972
                        if(stbInfo->useNow && stbInfo->interlaceRows == 1 && !fillBack) {
22,259!
1973
                            int64_t now = toolsGetTimestamp(database->precision);
×
1974
                            snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"", now);
×
1975
                        } else {
1976
                            snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"",
22,259!
1977
                                    disorderTs?disorderTs:childTbl->ts);
1978
                        }
1979

1980
                        // combine rows timestamp | other cols = sampleDataBuf[pos]
1981
                        if(stbInfo->useSampleTs) {
22,259!
1982
                            ds_add_strs(&pThreadInfo->buffer, 3, "(", 
×
1983
                                        sampleDataBuf + pos * stbInfo->lenOfCols, ") ");
×
1984
                        } else {
1985
                            ds_add_strs(&pThreadInfo->buffer, 5, "(", time_string, ",",
22,259✔
1986
                                        sampleDataBuf + pos * stbInfo->lenOfCols, ") ");
22,259✔
1987
                        }
1988
                        // check buffer enough
1989
                        if (ds_len(pThreadInfo->buffer)
22,269✔
1990
                                > stbInfo->max_sql_len) {
22,270!
1991
                            errorPrint("sql buffer length (%"PRIu64") "
×
1992
                                    "is larger than max sql length "
1993
                                    "(%"PRId64")\n",
1994
                                    ds_len(pThreadInfo->buffer),
1995
                                    stbInfo->max_sql_len);
1996
                            goto free_of_interlace;
×
1997
                        }
1998

1999
                        // move next
2000
                        generated++;
22,270✔
2001
                        pos++;
22,270✔
2002
                        if (pos >= g_arguments->prepared_rand) {
22,270!
2003
                            pos = 0;
×
2004
                        }
2005
                        if(stbInfo->primary_key)
22,270!
2006
                            debugPrint("add child=%s %"PRId64" pk cur=%d cnt=%d \n", childTbl->name, childTbl->ts, childTbl->pkCur, childTbl->pkCnt);
×
2007

2008
                        // primary key
2009
                        if (!stbInfo->primary_key || needChangeTs(stbInfo, &childTbl->pkCur, &childTbl->pkCnt)) {
22,270!
2010
                            childTbl->ts += stbInfo->timestamp_step;
22,267✔
2011
                            if(stbInfo->primary_key)
22,267!
2012
                                debugPrint("changedTs child=%s %"PRId64" pk cur=%d cnt=%d \n", childTbl->name, childTbl->ts, childTbl->pkCur, childTbl->pkCnt);
×
2013
                        }
2014
                        
2015
                    }
2016
                    break;
17,149✔
2017
                }
2018
                case STMT_IFACE: {
15,175✔
2019
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
15,175✔
2020
                    if (g_arguments->escape_character) {
15,175!
2021
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2,
×
2022
                                "`%s`", tableName);
2023
                    } else {
2024
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
15,175✔
2025
                                tableName);
2026
                    }
2027

2028
                    // generator
2029
                    if (stbInfo->autoTblCreating && w == 0) {
15,175!
2030
                        if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
2031
                            goto free_of_interlace;
×
2032
                        }
2033
                    }
2034
                    
2035
                    // old must call prepareStmt for each table
2036
                    if (oldInitStmt) {
15,175!
2037
                        debugPrint("call prepareStmt for stable:%s\n", stbInfo->stbName);
×
2038
                        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
×
2039
                            g_fail = true;
×
2040
                            goto free_of_interlace;
×
2041
                        }
2042
                    }
2043
      
2044
                    int64_t start = toolsGetTimestampUs();
15,175✔
2045
                    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
15,177!
2046
                                             escapedTbName)) {
2047
                        errorPrint(
×
2048
                            "taos_stmt_set_tbname(%s) failed, reason: %s\n",
2049
                            tableName,
2050
                                taos_stmt_errstr(pThreadInfo->conn->stmt));
2051
                        g_fail = true;
×
2052
                        goto free_of_interlace;
×
2053
                    }
2054
                    delay1 += toolsGetTimestampUs() - start;
15,174✔
2055

2056
                    int32_t n = 0;
15,176✔
2057
                    generated += bindParamBatch(pThreadInfo, interlaceRows,
15,176✔
2058
                                       childTbl->ts, pos, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n, &delay2, &delay3);
2059
                    
2060
                    // move next
2061
                    pos += interlaceRows;
15,177✔
2062
                    if (pos + interlaceRows + 1 >= g_arguments->prepared_rand) {
15,177!
2063
                        pos = 0;
×
2064
                    }
2065
                    childTbl->ts += stbInfo->timestamp_step * n;
15,177✔
2066

2067
                    // move next
2068
                    if (stbInfo->autoTblCreating) {
15,177!
2069
                        w += 1;
×
2070
                        if (w >= TAG_BATCH_COUNT) {
×
2071
                            // reset for gen again
2072
                            w = 0;
×
2073
                            index += TAG_BATCH_COUNT;
×
2074
                        }
2075
                    }
2076

2077
                    break;
15,177✔
2078
                }
2079
                case STMT2_IFACE: {
16,985✔
2080
                    // tbnames
2081
                    bindv->tbnames[i] = childTbl->name;
16,985✔
2082

2083
                    // tags
2084
                    if (stbInfo->autoTblCreating && firstInsertTb) {
16,985!
2085
                        // create
2086
                        if (w == 0) {
×
2087
                            // recreate sample tags
2088
                            if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, pThreadInfo->tagsStmt, index)) {
×
2089
                                goto free_of_interlace;
×
2090
                            }
2091
                        }
2092

2093
                        if (csvFile) {
×
2094
                            if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
×
2095
                                g_fail = true;
×
2096
                                goto free_of_interlace;
×
2097
                            }
2098
                        }
2099
                    
2100
                        bindVTags(bindv, i, w, pThreadInfo->tagsStmt);
×
2101
                    }
2102

2103
                    // cols
2104
                    int32_t n = 0;
16,985✔
2105
                    generated += bindVColsInterlace(bindv, i, pThreadInfo, interlaceRows, childTbl->ts, pos, 
16,985✔
2106
                                                    childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n);                    
2107
                    // move next
2108
                    pos += interlaceRows;
16,989✔
2109
                    if (pos + interlaceRows + 1 >= g_arguments->prepared_rand) {
16,989!
2110
                        pos = 0;
×
2111
                    }
2112
                    childTbl->ts += stbInfo->timestamp_step * n;
16,989✔
2113
                    if (stbInfo->autoTblCreating) {
16,989!
2114
                        w += 1;
×
2115
                        if (w >= TAG_BATCH_COUNT) {
×
2116
                            // reset for gen again
2117
                            w = 0;
×
2118
                            index += TAG_BATCH_COUNT;
×
2119
                        }
2120
                    }
2121

2122
                    break;
16,989✔
2123
                }
2124
                case SML_REST_IFACE:
2,093✔
2125
                case SML_IFACE: {
2126
                    int protocol = stbInfo->lineProtocol;
2,093✔
2127
                    for (int64_t j = 0; j < interlaceRows; j++) {
4,449✔
2128
                        int64_t disorderTs = getDisorderTs(stbInfo,
2,355✔
2129
                                &disorderRange);
2130
                        if (TSDB_SML_JSON_PROTOCOL == protocol) {
2,355!
2131
                            tools_cJSON *tag = tools_cJSON_Duplicate(
×
2132
                                tools_cJSON_GetArrayItem(
×
2133
                                    pThreadInfo->sml_json_tags,
×
2134
                                    (int)tableSeq -
×
2135
                                        pThreadInfo->start_table_from),
×
2136
                                    true);
2137
                            generateSmlJsonCols(
×
2138
                                pThreadInfo->json_array, tag, stbInfo,
2139
                                database->sml_precision,
×
2140
                                    disorderTs?disorderTs:childTbl->ts);
2141
                        } else if (SML_JSON_TAOS_FORMAT == protocol) {
2,355!
2142
                            tools_cJSON *tag = tools_cJSON_Duplicate(
×
2143
                                tools_cJSON_GetArrayItem(
×
2144
                                    pThreadInfo->sml_json_tags,
×
2145
                                    (int)tableSeq -
×
2146
                                        pThreadInfo->start_table_from),
×
2147
                                    true);
2148
                            generateSmlTaosJsonCols(
×
2149
                                pThreadInfo->json_array, tag, stbInfo,
2150
                                database->sml_precision,
×
2151
                                disorderTs?disorderTs:childTbl->ts);
2152
                        } else if (TSDB_SML_LINE_PROTOCOL == protocol) {
2,355✔
2153
                            snprintf(
2,196✔
2154
                                pThreadInfo->lines[generated],
2,196✔
2155
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
2,196✔
2156
                                "%s %s %" PRId64,
2157
                                pThreadInfo
2158
                                    ->sml_tags[(int)tableSeq -
2,196✔
2159
                                               pThreadInfo->start_table_from],
2,196✔
2160
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
2,196✔
2161
                                disorderTs?disorderTs:childTbl->ts);
2162
                        } else {
2163
                            snprintf(
159✔
2164
                                pThreadInfo->lines[generated],
159✔
2165
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
159✔
2166
                                "%s %" PRId64 " %s %s", stbInfo->stbName,
2167
                                disorderTs?disorderTs:childTbl->ts,
2168
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
159✔
2169
                                pThreadInfo
2170
                                    ->sml_tags[(int)tableSeq -
159✔
2171
                                               pThreadInfo->start_table_from]);
159!
2172
                        }
2173
                        generated++;
2,356✔
2174
                        // primary key
2175
                        if (!stbInfo->primary_key || needChangeTs(stbInfo, &childTbl->pkCur, &childTbl->pkCnt)) {
2,356!
2176
                            childTbl->ts += stbInfo->timestamp_step;
2,356✔
2177
                        }
2178
                    }
2179
                    if (TSDB_SML_JSON_PROTOCOL == protocol
2,094!
2180
                            || SML_JSON_TAOS_FORMAT == protocol) {
2,094!
2181
                        pThreadInfo->lines[0] =
×
2182
                            tools_cJSON_PrintUnformatted(
×
2183
                                pThreadInfo->json_array);
×
2184
                    }
2185
                    break;
2,094✔
2186
                }
2187
            }
2188

2189
            // move to next table in one batch
2190
            tableSeq++;
51,404✔
2191
            tmp_total_insert_rows += interlaceRows;
51,404✔
2192
            if (tableSeq > pThreadInfo->end_table_to) {
51,404✔
2193
                // first insert tables loop is end
2194
                firstInsertTb = false;
36,468✔
2195
                // one tables loop timestamp and pos add 
2196
                tableSeq = pThreadInfo->start_table_from;
36,468✔
2197
                // save    
2198
                pThreadInfo->pos = pos;    
36,468✔
2199
                if (!stbInfo->non_stop) {
36,468!
2200
                    insertRows -= interlaceRows;
36,473✔
2201
                }
2202

2203
                // if fillBack mode , can't sleep
2204
                if (stbInfo->insert_interval > 0 && !fillBack) {
36,468!
2205
                    debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
230!
2206
                          __func__, __LINE__, stbInfo->insert_interval);
2207
                    perfPrint("sleep %" PRIu64 " ms\n",
230!
2208
                                     stbInfo->insert_interval);
2209
                    toolsMsleep((int32_t)stbInfo->insert_interval);
230✔
2210
                }
2211

2212
                i++;
36,472✔
2213
                // rectify bind count
2214
                if (bindv && bindv->count != i) {
36,472!
2215
                    bindv->count = i;
×
2216
                }                
2217
                break;
36,472✔
2218
            }
2219
        }
2220

2221
        // exec
2222
        if(stbInfo->iface == STMT2_IFACE) {
36,513✔
2223
            // exec stmt2
2224
            if(g_arguments->debug_print)
12,078!
2225
                showBindV(bindv, stbInfo->tags, stbInfo->cols);
×
2226
            // bind & exec stmt2
2227
            if (submitStmt2(pThreadInfo, bindv, &delay1, &delay3, &startTs, &endTs, &generated, w) != 0) {
12,078!
2228
                g_fail = true;
×
2229
                goto free_of_interlace;
×
2230
            }
2231
        } else {
2232
            // exec other
2233
            startTs = toolsGetTimestampUs();
24,435✔
2234
            if (execInsert(pThreadInfo, generated, &delay3)) {
24,445!
2235
                g_fail = true;
×
2236
                goto free_of_interlace;
×
2237
            }
2238
            endTs = toolsGetTimestampUs();
24,434✔
2239
        }
2240

2241
        debugPrint("execInsert tableIndex=%d left insert rows=%"PRId64" generated=%d\n", i, insertRows, generated);
36,502!
2242
                
2243
        // reset count
2244
        if(bindv) {
36,523✔
2245
            bindv->count = 0;
12,067✔
2246
        }            
2247

2248
        pThreadInfo->totalInsertRows += tmp_total_insert_rows;
36,523✔
2249

2250
        if (g_arguments->terminate) {
36,523!
2251
            goto free_of_interlace;
×
2252
        }
2253

2254
        int protocol = stbInfo->lineProtocol;
36,523✔
2255
        switch (stbInfo->iface) {
36,523✔
2256
            case TAOSC_IFACE:
12,189✔
2257
            case REST_IFACE:
2258
                debugPrint("pThreadInfo->buffer: %s\n",
12,189!
2259
                           pThreadInfo->buffer);
2260
                free_ds(&pThreadInfo->buffer);
12,189✔
2261
                pThreadInfo->buffer = new_ds(0);
12,186✔
2262
                break;
12,181✔
2263
            case SML_REST_IFACE:
16✔
2264
                memset(pThreadInfo->buffer, 0,
16✔
2265
                       g_arguments->reqPerReq * (pThreadInfo->max_sql_len + 1));
16✔
2266
            case SML_IFACE:
148✔
2267
                if (TSDB_SML_JSON_PROTOCOL == protocol
148✔
2268
                        || SML_JSON_TAOS_FORMAT == protocol) {
134!
2269
                    debugPrint("pThreadInfo->lines[0]: %s\n",
14!
2270
                               pThreadInfo->lines[0]);
2271
                    if (pThreadInfo->json_array && !g_arguments->terminate) {
14!
2272
                        tools_cJSON_Delete(pThreadInfo->json_array);
×
2273
                        pThreadInfo->json_array = NULL;
×
2274
                    }
2275
                    pThreadInfo->json_array = tools_cJSON_CreateArray();
14✔
2276
                    if (pThreadInfo->lines && pThreadInfo->lines[0]) {
×
2277
                        tmfree(pThreadInfo->lines[0]);
×
2278
                        pThreadInfo->lines[0] = NULL;
×
2279
                    }
2280
                } else {
2281
                    for (int j = 0; j < generated; j++) {
2,490✔
2282
                        if (pThreadInfo && pThreadInfo->lines
2,356!
2283
                                && !g_arguments->terminate) {
2,356!
2284
                            debugPrint("pThreadInfo->lines[%d]: %s\n", j,
2,356!
2285
                                       pThreadInfo->lines[j]);
2286
                            memset(pThreadInfo->lines[j], 0,
2,356✔
2287
                                   pThreadInfo->max_sql_len);
2288
                        }
2289
                    }
2290
                }
2291
                break;
134✔
2292
            case STMT_IFACE:
12,120✔
2293
                break;
12,120✔
2294
        }
2295

2296
        int64_t delay4 = endTs - startTs;
36,501✔
2297
        int64_t delay = delay1 + delay2 + delay3 + delay4;
36,501✔
2298
        if (delay <=0) {
36,501!
2299
            debugPrint("thread[%d]: startTS: %"PRId64", endTS: %"PRId64"\n",
×
2300
                       pThreadInfo->threadID, startTs, endTs);
2301
        } else {
2302
            perfPrint("insert execution time is %10.2f ms\n",
36,501!
2303
                      delay / 1E6);
2304

2305
            int64_t * pdelay = benchCalloc(1, sizeof(int64_t), false);
36,501✔
2306
            *pdelay = delay;
36,498✔
2307
            if (benchArrayPush(pThreadInfo->delayList, pdelay) == NULL) {
36,498!
2308
                tmfree(pdelay);
×
2309
            }
2310
            pThreadInfo->totalDelay += delay;
36,524✔
2311
            pThreadInfo->totalDelay1 += delay1;
36,524✔
2312
            pThreadInfo->totalDelay2 += delay2;
36,524✔
2313
            pThreadInfo->totalDelay3 += delay3;
36,524✔
2314
        }
2315
        delay1 = delay2 = delay3 = 0;
36,524✔
2316

2317
        int64_t currentPrintTime = toolsGetTimestampMs();
36,524✔
2318
        if (currentPrintTime - lastPrintTime > 30 * 1000) {
36,524!
2319
            infoPrint(
×
2320
                    "thread[%d] has currently inserted rows: %" PRIu64
2321
                    ", peroid insert rate: %.3f rows/s \n",
2322
                    pThreadInfo->threadID, pThreadInfo->totalInsertRows,
2323
                    (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
2324
            lastPrintTime = currentPrintTime;
×
2325
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
×
2326
        }
2327
    }
2328

2329
free_of_interlace:
112✔
2330
    cleanupAndPrint(pThreadInfo, "interlace");
112✔
2331
    if(csvFile) {
107!
2332
        fclose(csvFile);
×
2333
    }
2334
    tmfree(tagData);
107✔
2335
    freeBindV(bindv);
107✔
2336
    return NULL;
107✔
2337
}
2338

2339
static int32_t prepareProgressDataStmt(
10,454✔
2340
        threadInfo *pThreadInfo,
2341
        SChildTable *childTbl,
2342
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1, int64_t *delay2, int64_t *delay3) {
2343
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
10,454✔
2344
    char escapedTbName[TSDB_TABLE_NAME_LEN + 2] = "\0";
10,454✔
2345
    if (g_arguments->escape_character) {
10,454✔
2346
        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN + 2,
10,231✔
2347
                 "`%s`", childTbl->name);
2348
    } else {
2349
        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
223✔
2350
                 childTbl->name);
2351
    }
2352
    int64_t start = toolsGetTimestampUs();
10,454✔
2353
    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
10,455!
2354
                             escapedTbName)) {
2355
        errorPrint(
×
2356
                "taos_stmt_set_tbname(%s) failed,"
2357
                "reason: %s\n", escapedTbName,
2358
                taos_stmt_errstr(pThreadInfo->conn->stmt));
2359
        return -1;
×
2360
    }
2361
    *delay1 = toolsGetTimestampUs() - start;
10,437✔
2362
    int32_t n = 0;
10,437✔
2363
    int64_t pos = i % g_arguments->prepared_rand;
10,437✔
2364
    if (g_arguments->prepared_rand - pos < g_arguments->reqPerReq) {
10,437!
2365
        // remain prepare data less than batch, reset pos to zero
2366
        pos = 0;
×
2367
    }
2368
    int32_t generated = bindParamBatch(
20,873✔
2369
            pThreadInfo,
2370
            (g_arguments->reqPerReq > (stbInfo->insertRows - i))
10,437✔
2371
                ? (stbInfo->insertRows - i)
2372
                : g_arguments->reqPerReq,
10,437✔
2373
            *timestamp, pos, childTbl, pkCur, pkCnt, &n, delay2, delay3);
2374
    *timestamp += n * stbInfo->timestamp_step;
10,436✔
2375
    return generated;
10,436✔
2376
}
2377

2378
static void makeTimestampDisorder(
160✔
2379
        int64_t *timestamp, SSuperTable *stbInfo) {
2380
    int64_t startTimestamp = stbInfo->startTimestamp;
160✔
2381
    int disorderRange = stbInfo->disorderRange;
160✔
2382
    int rand_num = taosRandom() % 100;
160✔
2383
    if (rand_num < stbInfo->disorderRatio) {
160✔
2384
        disorderRange--;
2✔
2385
        if (0 == disorderRange) {
2!
2386
            disorderRange = stbInfo->disorderRange;
×
2387
        }
2388
        *timestamp = startTimestamp - disorderRange;
2✔
2389
        debugPrint("rand_num: %d, < disorderRatio: %d"
2!
2390
                   ", ts: %"PRId64"\n",
2391
                   rand_num,
2392
                   stbInfo->disorderRatio,
2393
                   *timestamp);
2394
    }
2395
}
160✔
2396

2397
static int32_t prepareProgressDataSmlJsonText(
146✔
2398
    threadInfo *pThreadInfo,
2399
    uint64_t tableSeq,
2400
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2401
    // prepareProgressDataSmlJsonText
2402
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
146✔
2403
    int32_t generated = 0;
146✔
2404

2405
    int len = 0;
146✔
2406

2407
    char *line = pThreadInfo->lines[0];
146✔
2408
    uint32_t line_buf_len = pThreadInfo->line_buf_len;
146✔
2409

2410
    strncat(line + len, "[", 2);
146✔
2411
    len += 1;
146✔
2412

2413
    int32_t pos = 0;
146✔
2414
    for (int j = 0; (j < g_arguments->reqPerReq)
146✔
2415
            && !g_arguments->terminate; j++) {
1,523!
2416
        strncat(line + len, "{", 2);
1,451✔
2417
        len += 1;
1,451✔
2418
        int n;
2419
        n = snprintf(line + len, line_buf_len - len,
1,451✔
2420
                 "\"timestamp\":%"PRId64",", *timestamp);
2421
        if (n < 0 || n >= line_buf_len - len) {
1,451!
2422
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2423
                       __func__, __LINE__, j);
2424
            return -1;
×
2425
        } else {
2426
            len += n;
1,451✔
2427
        }
2428

2429
        n = snprintf(line + len, line_buf_len - len, "%s",
1,451✔
2430
                        pThreadInfo->sml_json_value_array[tableSeq]);
1,451✔
2431
        if (n < 0 || n >= line_buf_len - len) {
1,451!
2432
            errorPrint("%s() LN%d snprintf overflow on %d\n",
1!
2433
                       __func__, __LINE__, j);
2434
            return -1;
×
2435
        } else {
2436
            len += n;
1,450✔
2437
        }
2438
        n = snprintf(line + len, line_buf_len - len, "\"tags\":%s,",
1,450✔
2439
                       pThreadInfo->sml_tags_json_array[tableSeq]);
1,450✔
2440
        if (n < 0 || n >= line_buf_len - len) {
1,450!
2441
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2442
                       __func__, __LINE__, j);
2443
            return -1;
×
2444
        } else {
2445
            len += n;
1,451✔
2446
        }
2447
        n = snprintf(line + len, line_buf_len - len,
1,451✔
2448
                       "\"metric\":\"%s\"}", stbInfo->stbName);
2449
        if (n < 0 || n >= line_buf_len - len) {
1,451!
2450
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2451
                       __func__, __LINE__, j);
2452
            return -1;
×
2453
        } else {
2454
            len += n;
1,451✔
2455
        }
2456

2457
        pos++;
1,451✔
2458
        if (pos >= g_arguments->prepared_rand) {
1,451✔
2459
            pos = 0;
145✔
2460
        }
2461

2462
        // primay key repeat ts count
2463
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
1,451!
2464
            *timestamp += stbInfo->timestamp_step;
1,451✔
2465
        }
2466

2467
        if (stbInfo->disorderRatio > 0) {
1,451!
2468
            makeTimestampDisorder(timestamp, stbInfo);
×
2469
        }
2470
        generated++;
1,451✔
2471
        if (i + generated >= stbInfo->insertRows) {
1,451✔
2472
            break;
74✔
2473
        }
2474
        if ((j+1) < g_arguments->reqPerReq) {
1,377✔
2475
            strncat(line + len, ",", 2);
1,305✔
2476
            len += 1;
1,305✔
2477
        }
2478
    }
2479
    strncat(line + len, "]", 2);
146✔
2480

2481
    debugPrint("%s() LN%d, lines[0]: %s\n",
146!
2482
               __func__, __LINE__, pThreadInfo->lines[0]);
2483
    return generated;
146✔
2484
}
2485

2486
static int32_t prepareProgressDataSmlJson(
145✔
2487
    threadInfo *pThreadInfo,
2488
    uint64_t tableSeq,
2489
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2490
    // prepareProgressDataSmlJson
2491
    SDataBase *  database = pThreadInfo->dbInfo;
145✔
2492
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
145✔
2493
    int32_t generated = 0;
145✔
2494

2495
    int32_t pos = 0;
145✔
2496
    for (int j = 0; (j < g_arguments->reqPerReq)
145✔
2497
            && !g_arguments->terminate; j++) {
1,513!
2498
        tools_cJSON *tag = tools_cJSON_Duplicate(
1,441✔
2499
                tools_cJSON_GetArrayItem(
1,441✔
2500
                    pThreadInfo->sml_json_tags,
1,441✔
2501
                    (int)tableSeq -
1,441✔
2502
                    pThreadInfo->start_table_from),
1,441✔
2503
                true);
2504
        debugPrintJsonNoTime(tag);
1,440!
2505
        generateSmlTaosJsonCols(
1,440✔
2506
                pThreadInfo->json_array, tag, stbInfo,
2507
                database->sml_precision, *timestamp);
1,440✔
2508
        pos++;
1,441✔
2509
        if (pos >= g_arguments->prepared_rand) {
1,441✔
2510
            pos = 0;
144✔
2511
        }
2512

2513
        // primay key repeat ts count
2514
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
1,441!
2515
            *timestamp += stbInfo->timestamp_step;
1,441✔
2516
        }
2517

2518
        if (stbInfo->disorderRatio > 0) {
1,441!
2519
            makeTimestampDisorder(timestamp, stbInfo);
×
2520
        }
2521
        generated++;
1,441✔
2522
        if (i + generated >= stbInfo->insertRows) {
1,441✔
2523
            break;
73✔
2524
        }
2525
    }
2526

2527
    tmfree(pThreadInfo->lines[0]);
145✔
2528
    pThreadInfo->lines[0] = NULL;
145✔
2529
    pThreadInfo->lines[0] =
290✔
2530
            tools_cJSON_PrintUnformatted(
145✔
2531
                pThreadInfo->json_array);
145✔
2532
    debugPrint("pThreadInfo->lines[0]: %s\n",
145!
2533
                   pThreadInfo->lines[0]);
2534

2535
    return generated;
145✔
2536
}
2537

2538
static int32_t prepareProgressDataSmlLineOrTelnet(
444✔
2539
    threadInfo *pThreadInfo, uint64_t tableSeq, char *sampleDataBuf,
2540
    int64_t *timestamp, uint64_t i, char *ttl, int protocol, int32_t *pkCur, int32_t *pkCnt) {
2541
    // prepareProgressDataSmlLine
2542
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
444✔
2543
    int32_t generated = 0;
444✔
2544

2545
    int32_t pos = 0;
444✔
2546
    for (int j = 0; (j < g_arguments->reqPerReq)
444✔
2547
            && !g_arguments->terminate; j++) {
4,531!
2548
        // table index
2549
        int ti = tableSeq - pThreadInfo->start_table_from;
4,411✔
2550
        if (TSDB_SML_LINE_PROTOCOL == protocol) {
4,411✔
2551
            snprintf(
2,162✔
2552
                    pThreadInfo->lines[j],
2,162✔
2553
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
2,162✔
2554
                    "%s %s %" PRId64,
2555
                    pThreadInfo->sml_tags[ti],
2,162✔
2556
                    sampleDataBuf + pos * stbInfo->lenOfCols,
2,162✔
2557
                    *timestamp);
2558
        } else {
2559
            snprintf(
2,249✔
2560
                    pThreadInfo->lines[j],
2,249✔
2561
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
2,249✔
2562
                    "%s %" PRId64 " %s %s", stbInfo->stbName,
2563
                    *timestamp,
2564
                    sampleDataBuf
2565
                    + pos * stbInfo->lenOfCols,
2,249✔
2566
                    pThreadInfo->sml_tags[ti]);
2,249✔
2567
        }
2568
        //infoPrint("sml prepare j=%d stb=%s sml_tags=%s \n", j, stbInfo->stbName, pThreadInfo->sml_tags[ti]);
2569
        pos++;
4,411✔
2570
        if (pos >= g_arguments->prepared_rand) {
4,411✔
2571
            pos = 0;
225✔
2572
        }
2573
        // primay key repeat ts count
2574
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
4,411!
2575
            *timestamp += stbInfo->timestamp_step;
4,411✔
2576
        }
2577
        
2578
        if (stbInfo->disorderRatio > 0) {
4,411✔
2579
            makeTimestampDisorder(timestamp, stbInfo);
160✔
2580
        }
2581
        generated++;
4,411✔
2582
        if (i + generated >= stbInfo->insertRows) {
4,411✔
2583
            break;
324✔
2584
        }
2585
    }
2586
    return generated;
444✔
2587
}
2588

2589
static int32_t prepareProgressDataSml(
735✔
2590
    threadInfo *pThreadInfo,
2591
    SChildTable *childTbl,
2592
    uint64_t tableSeq,
2593
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2594
    // prepareProgressDataSml
2595
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
735✔
2596

2597
    char *sampleDataBuf;
2598
    if (childTbl->useOwnSample) {
735!
2599
        sampleDataBuf = childTbl->sampleDataBuf;
×
2600
    } else {
2601
        sampleDataBuf = stbInfo->sampleDataBuf;
735✔
2602
    }
2603
    int protocol = stbInfo->lineProtocol;
735✔
2604
    int32_t generated = -1;
735✔
2605
    switch (protocol) {
735!
2606
        case TSDB_SML_LINE_PROTOCOL:
444✔
2607
        case TSDB_SML_TELNET_PROTOCOL:
2608
            generated = prepareProgressDataSmlLineOrTelnet(
444✔
2609
                    pThreadInfo,
2610
                    tableSeq,
2611
                    sampleDataBuf,
2612
                    timestamp, i, ttl, protocol, pkCur, pkCnt);
2613
            break;
444✔
2614
        case TSDB_SML_JSON_PROTOCOL:
146✔
2615
            generated = prepareProgressDataSmlJsonText(
146✔
2616
                    pThreadInfo,
2617
                    tableSeq - pThreadInfo->start_table_from,
146✔
2618
                timestamp, i, ttl, pkCur, pkCnt);
2619
            break;
146✔
2620
        case SML_JSON_TAOS_FORMAT:
145✔
2621
            generated = prepareProgressDataSmlJson(
145✔
2622
                    pThreadInfo,
2623
                    tableSeq,
2624
                    timestamp, i, ttl, pkCur, pkCnt);
2625
            break;
145✔
2626
        default:
×
2627
            errorPrint("%s() LN%d: unknown protcolor: %d\n",
×
2628
                       __func__, __LINE__, protocol);
2629
            break;
×
2630
    }
2631

2632
    return generated;
735✔
2633
}
2634

2635
// if return true, timestmap must add timestap_step, else timestamp no need changed
2636
bool needChangeTs(SSuperTable * stbInfo, int32_t *pkCur, int32_t *pkCnt) {
×
2637
    // check need generate cnt
2638
    if(*pkCnt == 0) {
×
2639
        if (stbInfo->repeat_ts_min >= stbInfo->repeat_ts_max) {
×
2640
            // fixed count value is max
2641
            if (stbInfo->repeat_ts_max == 0){
×
2642
                return true;
×
2643
            }
2644

2645
            *pkCnt = stbInfo->repeat_ts_max;
×
2646
        } else {
2647
            // random range
2648
            *pkCnt = RD(stbInfo->repeat_ts_max + 1);
×
2649
            if(*pkCnt < stbInfo->repeat_ts_min) {
×
2650
                *pkCnt = (*pkCnt + stbInfo->repeat_ts_min) % stbInfo->repeat_ts_max;
×
2651
            }
2652
        }
2653
    }
2654

2655
    // compare with current value
2656
    *pkCur = *pkCur + 1;
×
2657
    if(*pkCur >= *pkCnt) {
×
2658
        // reset zero
2659
        *pkCur = 0;
×
2660
        *pkCnt = 0;
×
2661
        return true;
×
2662
    } else {
2663
        // add one
2664
        return false;
×
2665
    }
2666
}
2667

2668
static int32_t prepareProgressDataSql(
63,884✔
2669
                    threadInfo *pThreadInfo,
2670
                    SChildTable *childTbl, 
2671
                    char* tagData,
2672
                    uint64_t tableSeq,
2673
                    char *sampleDataBuf,
2674
                    int64_t *timestamp, uint64_t i, char *ttl,
2675
                    int32_t *pos, uint64_t *len, int32_t* pkCur, int32_t* pkCnt) {
2676
    // prepareProgressDataSql
2677
    int32_t generated = 0;
63,884✔
2678
    SDataBase *database = pThreadInfo->dbInfo;
63,884✔
2679
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
63,884✔
2680
    char *  pstr = pThreadInfo->buffer;
63,884✔
2681
    int disorderRange = stbInfo->disorderRange;
63,884✔
2682

2683
    if (stbInfo->partialColNum == stbInfo->cols->size) {
63,884✔
2684
        if (stbInfo->autoTblCreating) {
63,850✔
2685
            *len =
60✔
2686
                snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
60✔
2687
                        g_arguments->escape_character
60✔
2688
                        ? "%s `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s VALUES "
2689
                        : "%s %s.%s USING %s.%s TAGS (%s) %s VALUES ",
2690
                         STR_INSERT_INTO, database->dbName,
2691
                         childTbl->name, database->dbName,
2692
                         stbInfo->stbName,
2693
                         tagData +
2694
                         stbInfo->lenOfTags * tableSeq, ttl);
60!
2695
        } else {
2696
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
63,790✔
2697
                    g_arguments->escape_character
63,790✔
2698
                           ? "%s `%s`.`%s` VALUES "
2699
                           : "%s %s.%s VALUES ",
2700
                           STR_INSERT_INTO,
2701
                           database->dbName, childTbl->name);
2702
        }
2703
    } else {
2704
        if (stbInfo->autoTblCreating) {
34✔
2705
            *len = snprintf(
16✔
2706
                    pstr, TSDB_MAX_ALLOWED_SQL_LEN,
2707
                    g_arguments->escape_character
16✔
2708
                    ? "%s `%s`.`%s` (%s) USING `%s`.`%s` TAGS (%s) %s VALUES "
2709
                    : "%s %s.%s (%s) USING %s.%s TAGS (%s) %s VALUES ",
2710
                    STR_INSERT_INTO, database->dbName,
2711
                    childTbl->name,
2712
                    stbInfo->partialColNameBuf,
2713
                    database->dbName, stbInfo->stbName,
2714
                    tagData +
2715
                    stbInfo->lenOfTags * tableSeq, ttl);
16!
2716
        } else {
2717
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
18✔
2718
                    g_arguments->escape_character
18!
2719
                    ? "%s `%s`.`%s` (%s) VALUES "
2720
                    : "%s %s.%s (%s) VALUES ",
2721
                    STR_INSERT_INTO, database->dbName,
2722
                    childTbl->name,
2723
                    stbInfo->partialColNameBuf);
2724
        }
2725
    }
2726

2727
    char *ownSampleDataBuf;
2728
    if (childTbl->useOwnSample) {
63,884✔
2729
        debugPrint("%s is using own sample data\n",
6!
2730
                  childTbl->name);
2731
        ownSampleDataBuf = childTbl->sampleDataBuf;
6✔
2732
    } else {
2733
        ownSampleDataBuf = stbInfo->sampleDataBuf;
63,878✔
2734
    }
2735
    for (int j = 0; j < g_arguments->reqPerReq; j++) {
39,425,952!
2736
        if (stbInfo->useSampleTs
39,473,043✔
2737
                && (!stbInfo->random_data_source)) {
104!
2738
            *len +=
104✔
2739
                snprintf(pstr + *len,
104✔
2740
                         TSDB_MAX_ALLOWED_SQL_LEN - *len, "(%s)",
104✔
2741
                         sampleDataBuf +
2742
                         *pos * stbInfo->lenOfCols);
104✔
2743
        } else {
2744
            int64_t disorderTs = getDisorderTs(stbInfo, &disorderRange);
39,472,939✔
2745
            *len += snprintf(pstr + *len,
39,388,915✔
2746
                            TSDB_MAX_ALLOWED_SQL_LEN - *len,
39,388,915✔
2747
                            "(%" PRId64 ",%s)",
2748
                            disorderTs?disorderTs:*timestamp,
2749
                            ownSampleDataBuf +
2750
                            *pos * stbInfo->lenOfCols);
39,388,915✔
2751
        }
2752
        *pos += 1;
39,389,019✔
2753
        if (*pos >= g_arguments->prepared_rand) {
39,389,019✔
2754
            *pos = 0;
4,022✔
2755
        }
2756
        // primary key
2757
        if(!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
39,389,019!
2758
            *timestamp += stbInfo->timestamp_step;
39,474,500✔
2759
        }
2760
   
2761
        generated++;
39,389,019✔
2762
        if (*len > (TSDB_MAX_ALLOWED_SQL_LEN
39,389,019✔
2763
            - stbInfo->lenOfCols)) {
39,389,019✔
2764
            break;
847✔
2765
        }
2766
        if (i + generated >= stbInfo->insertRows) {
39,388,172✔
2767
            break;
26,104✔
2768
        }
2769
    }
2770

2771
    return generated;
×
2772
}
2773

2774
void *syncWriteProgressive(void *sarg) {
1,141✔
2775
    threadInfo * pThreadInfo = (threadInfo *)sarg;
1,141✔
2776
    SDataBase *  database = pThreadInfo->dbInfo;
1,141✔
2777
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
1,141✔
2778

2779
    loadChildTableInfo(pThreadInfo);
1,141✔
2780

2781
    // special deal flow for TAOSC_IFACE
2782
    if (insertDataMix(pThreadInfo, database, stbInfo)) {
1,141✔
2783
        // request be dealt by this function , so return
2784
        return NULL;
40✔
2785
    }
2786

2787
    infoPrint(
1,101✔
2788
        "thread[%d] start progressive inserting into table from "
2789
        "%" PRIu64 " to %" PRIu64 "\n",
2790
        pThreadInfo->threadID, pThreadInfo->start_table_from,
2791
        pThreadInfo->end_table_to + 1);
2792

2793
    uint64_t  lastPrintTime = toolsGetTimestampMs();
1,101✔
2794
    uint64_t  lastTotalInsertRows = 0;
1,101✔
2795
    int64_t   startTs = toolsGetTimestampUs();
1,101✔
2796
    int64_t   endTs;
2797

2798
    FILE* csvFile = NULL;
1,101✔
2799
    char* tagData = NULL;
1,101✔
2800
    bool  stmt    = (stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating;
1,101✔
2801
    bool  smart   = SMART_IF_FAILED == stbInfo->continueIfFail;
1,101✔
2802
    bool  acreate = (stbInfo->iface == TAOSC_IFACE || stbInfo->iface == REST_IFACE) && stbInfo->autoTblCreating;
1,101✔
2803
    int   w       = 0;
1,101✔
2804
    if (stmt || smart || acreate) {
1,101✔
2805
        csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
32✔
2806
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
32✔
2807
    }
2808

2809
    bool oldInitStmt = stbInfo->autoTblCreating;
1,101✔
2810
    // stmt.  not auto table create call on stmt
2811
    if (stbInfo->iface == STMT_IFACE && !oldInitStmt) {
1,101✔
2812
        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
69!
2813
            g_fail = true;
×
2814
            goto free_of_progressive;
×
2815
        }
2816
    }
2817
    else if (stbInfo->iface == STMT2_IFACE && !stbInfo->autoTblCreating) {
1,032!
2818
        if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
60!
2819
            g_fail = true;
×
2820
            goto free_of_progressive;
×
2821
        }
2822
    }
2823
    
2824
    //
2825
    // loop write each child table
2826
    //
2827
    int16_t index = pThreadInfo->start_table_from;
1,101✔
2828
    for (uint64_t tableSeq = pThreadInfo->start_table_from;
1,101✔
2829
            tableSeq <= pThreadInfo->end_table_to; tableSeq++) {
38,364✔
2830
        char *sampleDataBuf;
2831
        SChildTable *childTbl;
2832

2833
        if (g_arguments->bind_vgroup) {
37,264✔
2834
            childTbl = pThreadInfo->vg->childTblArray[tableSeq];
40✔
2835
        } else {
2836
            childTbl = stbInfo->childTblArray[tableSeq];
37,224✔
2837
        }
2838
        debugPrint("tableSeq=%"PRId64" childTbl->name=%s\n", tableSeq, childTbl->name);
37,264✔
2839

2840
        if (childTbl->useOwnSample) {
37,290✔
2841
            sampleDataBuf = childTbl->sampleDataBuf;
12✔
2842
        } else {
2843
            sampleDataBuf = stbInfo->sampleDataBuf;
37,278✔
2844
        }
2845

2846
        int64_t  timestamp = pThreadInfo->start_time;
37,290✔
2847
        uint64_t len = 0;
37,290✔
2848
        int32_t pos = 0;
37,290✔
2849
        int32_t pkCur = 0; // record generate same timestamp current count
37,290✔
2850
        int32_t pkCnt = 0; // record generate same timestamp count
37,290✔
2851
        int64_t delay1 = 0;
37,290✔
2852
        int64_t delay2 = 0;
37,290✔
2853
        int64_t delay3 = 0;
37,290✔
2854

2855
        if(stmt || smart || acreate) {
37,290✔
2856
            // generator
2857
            if (w == 0) {
10,083✔
2858
                if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
122!
2859
                    g_fail = true;
×
2860
                    goto free_of_progressive;
4✔
2861
                }
2862
            }
2863
        }
2864

2865
        // old init stmt must call for each table
2866
        if (stbInfo->iface == STMT_IFACE && oldInitStmt) {
37,290✔
2867
            if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
10,014!
2868
                g_fail = true;
×
2869
                goto free_of_progressive;
×
2870
            }
2871
        }
2872
        else if (stbInfo->iface == STMT2_IFACE && stbInfo->autoTblCreating) {
27,276!
2873
            if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
×
2874
                g_fail = true;
×
2875
                goto free_of_progressive;
×
2876
            }
2877
        }
2878
        
2879
        if(stmt || smart || acreate) {
37,292✔
2880
            // move next
2881
            if (++w >= TAG_BATCH_COUNT) {
10,085✔
2882
                // reset for gen again
2883
                w = 0;
100✔
2884
                index += TAG_BATCH_COUNT;
100✔
2885
            } 
2886
        }
2887

2888
        char ttl[SMALL_BUFF_LEN] = "";
37,292✔
2889
        if (stbInfo->ttl != 0) {
37,292✔
2890
            snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
24✔
2891
        }
2892
        for (uint64_t i = 0; i < stbInfo->insertRows;) {
75,456✔
2893
            if (g_arguments->terminate) {
75,452!
2894
                goto free_of_progressive;
×
2895
            }
2896
            int32_t generated = 0;
75,452✔
2897
            switch (stbInfo->iface) {
75,452!
2898
                case TAOSC_IFACE:
63,888✔
2899
                case REST_IFACE:                
2900
                    generated = prepareProgressDataSql(
63,888✔
2901
                            pThreadInfo,
2902
                            childTbl,
2903
                            tagData,
2904
                            w,
2905
                            sampleDataBuf,
2906
                            &timestamp, i, ttl, &pos, &len, &pkCur, &pkCnt);
2907
                    break;        
63,896✔
2908
                case STMT_IFACE: {
10,456✔
2909
                    generated = prepareProgressDataStmt(
10,456✔
2910
                            pThreadInfo,
2911
                            childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1, &delay2, &delay3);
2912
                    break;
10,433✔
2913
                }
2914
                case STMT2_IFACE: {
381✔
2915
                    generated = stmt2BindAndSubmit(
381✔
2916
                            pThreadInfo,
2917
                            childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1,
2918
                            &delay3, &startTs, &endTs, w);
2919
                    break;
381✔
2920
                }
2921
                case SML_REST_IFACE:
735✔
2922
                case SML_IFACE:
2923
                    generated = prepareProgressDataSml(
735✔
2924
                            pThreadInfo,
2925
                            childTbl,
2926
                            tableSeq, &timestamp, i, ttl, &pkCur, &pkCnt);
2927
                    break;
735✔
2928
                default:
×
2929
                    break;
×
2930
            }
2931
            if (generated < 0) {
75,437!
2932
                g_fail = true;
×
2933
                goto free_of_progressive;
×
2934
            }
2935
            if (!stbInfo->non_stop) {
75,437!
2936
                i += generated;
75,446✔
2937
            }
2938

2939
            // stmt2 execInsert already execute on stmt2BindAndSubmit
2940
            if (stbInfo->iface != STMT2_IFACE) {
75,437✔
2941
                // no stmt2 exec
2942
                startTs = toolsGetTimestampUs();
75,078✔
2943
                int code = execInsert(pThreadInfo, generated, &delay3);
75,079✔
2944
                if (code) {
75,056✔
2945
                    if (NO_IF_FAILED == stbInfo->continueIfFail) {
74✔
2946
                        warnPrint("The super table parameter "
4!
2947
                                "continueIfFail: %d, STOP insertion!\n",
2948
                                stbInfo->continueIfFail);
2949
                        g_fail = true;
4✔
2950
                        goto free_of_progressive;
4✔
2951
                    } else if (YES_IF_FAILED == stbInfo->continueIfFail) {
70!
2952
                        infoPrint("The super table parameter "
70✔
2953
                                "continueIfFail: %d, "
2954
                                "will continue to insert ..\n",
2955
                                stbInfo->continueIfFail);
2956
                    } else if (smart) {
×
2957
                        warnPrint("The super table parameter "
×
2958
                                "continueIfFail: %d, will create table "
2959
                                "then insert ..\n",
2960
                                stbInfo->continueIfFail);
2961

2962
                        // generator
2963
                        if (w == 0) {
×
2964
                            if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
2965
                                g_fail = true;
×
2966
                                goto free_of_progressive;
×
2967
                            }
2968
                        }
2969

2970
                        code = smartContinueIfFail(
×
2971
                                pThreadInfo,
2972
                                childTbl, tagData, w, ttl);
2973
                        if (0 != code) {
×
2974
                            g_fail = true;
×
2975
                            goto free_of_progressive;
×
2976
                        }
2977

2978
                        // move next
2979
                        if (++w >= TAG_BATCH_COUNT) {
×
2980
                            // reset for gen again
2981
                            w = 0;
×
2982
                            index += TAG_BATCH_COUNT;
×
2983
                        }
2984

2985
                        code = execInsert(pThreadInfo, generated, &delay3);
×
2986
                        if (code) {
×
2987
                            g_fail = true;
×
2988
                            goto free_of_progressive;
×
2989
                        }
2990
                    } else {
2991
                        warnPrint("Unknown super table parameter "
×
2992
                                "continueIfFail: %d\n",
2993
                                stbInfo->continueIfFail);
2994
                        g_fail = true;
×
2995
                        goto free_of_progressive;
×
2996
                    }
2997
                }
2998
                endTs = toolsGetTimestampUs() + 1;
75,052✔
2999
            }
3000

3001
            if (stbInfo->insert_interval > 0) {
75,405✔
3002
                debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
400!
3003
                          __func__, __LINE__, stbInfo->insert_interval);
3004
                perfPrint("sleep %" PRIu64 " ms\n",
400!
3005
                              stbInfo->insert_interval);
3006
                toolsMsleep((int32_t)stbInfo->insert_interval);
400✔
3007
            }
3008

3009
            // flush
3010
            if (database->flush) {
75,445✔
3011
                char sql[260] = "";
103✔
3012
                sprintf(sql, "flush database %s", database->dbName);
103✔
3013
                int32_t code = executeSql(pThreadInfo->conn->taos,sql);
103✔
3014
                if (code != 0) {
103!
3015
                  perfPrint(" %s failed. error code = 0x%x\n", sql, code);
×
3016
                } else {
3017
                   perfPrint(" %s ok.\n", sql);
103!
3018
                }
3019
            }
3020

3021
            pThreadInfo->totalInsertRows += generated;
75,445✔
3022

3023
            if (g_arguments->terminate) {
75,445!
3024
                goto free_of_progressive;
×
3025
            }
3026
            int protocol = stbInfo->lineProtocol;
75,445✔
3027
            switch (stbInfo->iface) {
75,445✔
3028
                case REST_IFACE:
63,891✔
3029
                case TAOSC_IFACE:
3030
                    memset(pThreadInfo->buffer, 0, pThreadInfo->max_sql_len);
63,891✔
3031
                    break;
63,891✔
3032
                case SML_REST_IFACE:
21✔
3033
                    memset(pThreadInfo->buffer, 0,
21✔
3034
                           g_arguments->reqPerReq *
21✔
3035
                               (pThreadInfo->max_sql_len + 1));                    
21✔
3036
                case SML_IFACE:
735✔
3037
                    if (TSDB_SML_JSON_PROTOCOL == protocol) {
735✔
3038
                        memset(pThreadInfo->lines[0], 0,
146✔
3039
                           pThreadInfo->line_buf_len);
146✔
3040
                    } else if (SML_JSON_TAOS_FORMAT == protocol) {
589✔
3041
                        if (pThreadInfo->lines && pThreadInfo->lines[0]) {
145!
3042
                            tmfree(pThreadInfo->lines[0]);
145✔
3043
                            pThreadInfo->lines[0] = NULL;
145✔
3044
                        }
3045
                        if (pThreadInfo->json_array) {
145!
3046
                            tools_cJSON_Delete(pThreadInfo->json_array);
145✔
3047
                            pThreadInfo->json_array = NULL;
145✔
3048
                        }
3049
                        pThreadInfo->json_array = tools_cJSON_CreateArray();
145✔
3050
                    } else {
3051
                        for (int j = 0; j < generated; j++) {
4,857✔
3052
                            debugPrint("pThreadInfo->lines[%d]: %s\n",
4,413!
3053
                                       j, pThreadInfo->lines[j]);
3054
                            memset(pThreadInfo->lines[j], 0,
4,413✔
3055
                                   pThreadInfo->max_sql_len);
3056
                        }
3057
                    }
3058
                    break;
735✔
3059
                case STMT_IFACE:
10,441✔
3060
                    break;
10,441✔
3061
            }
3062

3063
            int64_t delay4 = endTs - startTs;
75,445✔
3064
            int64_t delay = delay1 + delay2 + delay3 + delay4;
75,445✔
3065
            if (delay <= 0) {
75,445!
3066
                debugPrint("thread[%d]: startTs: %"PRId64", endTs: %"PRId64"\n",
×
3067
                        pThreadInfo->threadID, startTs, endTs);
3068
            } else {
3069
                perfPrint("insert execution time is %.6f s\n",
75,445!
3070
                              delay / 1E6);
3071

3072
                int64_t * pDelay = benchCalloc(1, sizeof(int64_t), false);
75,445✔
3073
                *pDelay = delay;
75,421✔
3074
                if (benchArrayPush(pThreadInfo->delayList, pDelay) == NULL) {
75,421!
3075
                    tmfree(pDelay);
×
3076
                }
3077
                pThreadInfo->totalDelay += delay;
75,462✔
3078
                pThreadInfo->totalDelay1 += delay1;
75,462✔
3079
                pThreadInfo->totalDelay2 += delay2;
75,462✔
3080
                pThreadInfo->totalDelay3 += delay3;
75,462✔
3081
            }
3082
            delay1 = delay2 = delay3 = 0;
75,462✔
3083

3084
            int64_t currentPrintTime = toolsGetTimestampMs();
75,462✔
3085
            if (currentPrintTime - lastPrintTime > 30 * 1000) {
75,444✔
3086
                infoPrint(
224✔
3087
                        "thread[%d] has currently inserted rows: "
3088
                        "%" PRId64 ", peroid insert rate: %.3f rows/s \n",
3089
                        pThreadInfo->threadID, pThreadInfo->totalInsertRows,
3090
                        (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
3091
                lastPrintTime = currentPrintTime;
203✔
3092
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
203✔
3093
            }
3094
            if (i >= stbInfo->insertRows) {
75,423✔
3095
                break;
37,259✔
3096
            }
3097
        }  // insertRows
3098
    }      // tableSeq
3099
free_of_progressive:
1,100✔
3100
    cleanupAndPrint(pThreadInfo, "progressive");
1,104✔
3101
    if(csvFile) {
1,101!
3102
        fclose(csvFile);
×
3103
    }
3104
    tmfree(tagData);
1,101✔
3105
    return NULL;
1,101✔
3106
}
3107

3108
uint64_t strToTimestamp(char * tsStr) {
40✔
3109
    uint64_t ts = 0;
40✔
3110
    // remove double quota mark
3111
    if (tsStr[0] == '\"' || tsStr[0] == '\'') {
40!
3112
        tsStr += 1;
×
3113
        int32_t last = strlen(tsStr) - 1;
×
3114
        if (tsStr[last] == '\"' || tsStr[0] == '\'') {
×
3115
            tsStr[last] = 0;
×
3116
        }
3117
    }
3118

3119
    if (toolsParseTime(tsStr, (int64_t*)&ts, strlen(tsStr), TSDB_TIME_PRECISION_MILLI, 0)) {
40!
3120
        // not timestamp str format, maybe int64 format
3121
        ts = (int64_t)atol(tsStr);
40✔
3122
    }
3123

3124
    return ts;
40✔
3125
}
3126

3127
static int initStmtDataValue(SSuperTable *stbInfo, SChildTable *childTbl, uint64_t *bind_ts_array) {
52✔
3128
    int32_t columnCount = stbInfo->cols->size;
52✔
3129

3130
    char *sampleDataBuf;
3131
    if (childTbl) {
52✔
3132
        sampleDataBuf = childTbl->sampleDataBuf;
8✔
3133
    } else {
3134
        sampleDataBuf = stbInfo->sampleDataBuf;
44✔
3135
    }
3136
    int64_t lenOfOneRow = stbInfo->lenOfCols;
52✔
3137

3138
    if (stbInfo->useSampleTs) {
52✔
3139
        columnCount += 1;  // for skipping first column
4✔
3140
    }
3141
    for (int i=0; i < g_arguments->prepared_rand; i++) {
617,443✔
3142
        int cursor = 0;
617,391✔
3143

3144
        for (int c = 0; c < columnCount; c++) {
6,668,094✔
3145
            char *restStr = sampleDataBuf
6,050,703✔
3146
                + lenOfOneRow * i + cursor;
6,050,703✔
3147
            int lengthOfRest = strlen(restStr);
6,050,703✔
3148

3149
            int index = 0;
6,050,703✔
3150
            for (index = 0; index < lengthOfRest; index++) {
97,749,343✔
3151
                if (restStr[index] == ',') {
97,150,538✔
3152
                    break;
5,451,898✔
3153
                }
3154
            }
3155

3156
            cursor += index + 1;  // skip ',' too
6,050,703✔
3157

3158
            char *tmpStr = calloc(1, index + 1);
6,050,703✔
3159
            if (NULL == tmpStr) {
6,050,703!
3160
                errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
×
3161
                        __func__, __LINE__, index + 1);
3162
                return -1;
×
3163
            }
3164

3165
            strncpy(tmpStr, restStr, index);
6,050,703✔
3166
            if ((0 == c) && stbInfo->useSampleTs) {
6,050,703✔
3167
                // set ts to 
3168
                bind_ts_array[i] = strToTimestamp(tmpStr); 
40✔
3169
                free(tmpStr);
40✔
3170
                continue;
40✔
3171
            }
3172

3173
            Field *col = benchArrayGet(stbInfo->cols,
6,050,663✔
3174
                    (stbInfo->useSampleTs?c-1:c));
6,050,663✔
3175
            char dataType = col->type;
6,050,663✔
3176

3177
            StmtData *stmtData;
3178
            if (childTbl) {
6,050,663✔
3179
                ChildField *childCol =
3180
                    benchArrayGet(childTbl->childCols,
8,462✔
3181
                                  (stbInfo->useSampleTs?c-1:c));
8,462✔
3182
                stmtData = &childCol->stmtData;
8,462✔
3183
            } else {
3184
                stmtData = &col->stmtData;
6,042,201✔
3185
            }
3186

3187
            // set value
3188
            stmtData->is_null[i] = 0;
6,050,663✔
3189
            stmtData->lengths[i] = col->length;
6,050,663✔
3190

3191
            if (0 == strcmp(tmpStr, "NULL")) {
6,050,663✔
3192
                *(stmtData->is_null + i) = true;
10✔
3193
            } else {
3194
                switch (dataType) {
6,050,653!
3195
                    case TSDB_DATA_TYPE_INT:
607,601✔
3196
                    case TSDB_DATA_TYPE_UINT:
3197
                        *((int32_t*)stmtData->data + i) = atoi(tmpStr);
607,601✔
3198
                        break;
607,601✔
3199
                    case TSDB_DATA_TYPE_FLOAT:
2,027,262✔
3200
                        *((float*)stmtData->data +i) = (float)atof(tmpStr);
2,027,262✔
3201
                        break;
2,027,262✔
3202
                    case TSDB_DATA_TYPE_DOUBLE:
873,820✔
3203
                        *((double*)stmtData->data + i) = atof(tmpStr);
873,820✔
3204
                        break;
873,820✔
3205
                    case TSDB_DATA_TYPE_TINYINT:
120,440✔
3206
                    case TSDB_DATA_TYPE_UTINYINT:
3207
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
120,440✔
3208
                        break;
120,440✔
3209
                    case TSDB_DATA_TYPE_SMALLINT:
120,440✔
3210
                    case TSDB_DATA_TYPE_USMALLINT:
3211
                        *((int16_t*)stmtData->data + i) = (int16_t)atoi(tmpStr);
120,440✔
3212
                        break;
120,440✔
3213
                    case TSDB_DATA_TYPE_BIGINT:
140,440✔
3214
                    case TSDB_DATA_TYPE_UBIGINT:
3215
                        *((int64_t*)stmtData->data + i) = (int64_t)atol(tmpStr);
140,440✔
3216
                        break;
140,440✔
3217
                    case TSDB_DATA_TYPE_BOOL:
60,220✔
3218
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
60,220✔
3219
                        break;
60,220✔
3220
                    case TSDB_DATA_TYPE_TIMESTAMP:
20,010✔
3221
                        *((int64_t*)stmtData->data + i) = (int64_t)atol(tmpStr);
20,010✔
3222
                        break;
20,010✔
3223
                    case TSDB_DATA_TYPE_BINARY:
2,080,420✔
3224
                    case TSDB_DATA_TYPE_NCHAR:
3225
                    case TSDB_DATA_TYPE_VARBINARY:
3226
                    case TSDB_DATA_TYPE_GEOMETRY:
3227
                        {
3228
                            size_t tmpLen = strlen(tmpStr);
2,080,420✔
3229
                            debugPrint("%s() LN%d, index: %d, "
2,080,420!
3230
                                    "tmpStr len: %"PRIu64", col->length: %d\n",
3231
                                    __func__, __LINE__,
3232
                                    i, (uint64_t)tmpLen, col->length);
3233
                            if (tmpLen-2 > col->length) {
2,080,420!
3234
                                errorPrint("data length %"PRIu64" "
×
3235
                                        "is larger than column length %d\n",
3236
                                        (uint64_t)tmpLen, col->length);
3237
                            }
3238
                            if (tmpLen > 2) {
2,080,420✔
3239
                                strncpy((char *)stmtData->data
2,010,125✔
3240
                                            + i * col->length,
2,010,125✔
3241
                                        tmpStr+1,
2,010,125✔
3242
                                        min(col->length, tmpLen - 2));
2,010,125✔
3243
                            } else {
3244
                                strncpy((char *)stmtData->data
70,295✔
3245
                                            + i*col->length,
70,295✔
3246
                                        "", 1);
3247
                            }
3248
                        }
3249
                        break;
2,080,420✔
3250
                    case TSDB_DATA_TYPE_DECIMAL:
×
3251
                    case TSDB_DATA_TYPE_DECIMAL64:
3252
                        errorPrint("Not implemented data type in func initStmtDataValue: %s\n",
×
3253
                                convertDatatypeToString(dataType));
3254
                        exit(EXIT_FAILURE);
×
3255
                    case TSDB_DATA_TYPE_BLOB: {
×
3256
                        size_t tmpLen = strlen(tmpStr);
×
3257
                        debugPrint(
×
3258
                            "%s() LN%d, index: %d, "
3259
                            "tmpStr len: %" PRIu64 ", col->length: %d\n",
3260
                            __func__, __LINE__, i, (uint64_t)tmpLen, col->length);
3261
                        if (tmpLen - 2 > col->length) {
×
3262
                                errorPrint("data length %" PRIu64
×
3263
                                           " "
3264
                                           "is larger than column length %d\n",
3265
                                           (uint64_t)tmpLen, col->length);
3266
                        }
3267
                        if (tmpLen > 2) {
×
3268
                                strncpy((char *)stmtData->data + i * col->length, tmpStr + 1,
×
3269
                                        min(col->length, tmpLen - 2));
×
3270
                        } else {
3271
                                strncpy((char *)stmtData->data + i * col->length, "", 1);
×
3272
                        }
3273
                        break;
×
3274
                    }
3275

3276
                    default:
×
3277
                        break;
×
3278
                }
3279
            }
3280
            free(tmpStr);
6,050,663✔
3281
        }
3282
    }
3283
    return 0;
52✔
3284
}
3285

3286
static void initStmtData(char dataType, void **data, uint32_t length) {
505✔
3287
    char *tmpP = NULL;
505✔
3288

3289
    switch (dataType) {
505!
3290
        case TSDB_DATA_TYPE_INT:
52✔
3291
        case TSDB_DATA_TYPE_UINT:
3292
            tmpP = calloc(1, sizeof(int) * g_arguments->prepared_rand);
52✔
3293
            assert(tmpP);
52!
3294
            tmfree(*data);
52✔
3295
            *data = (void*)tmpP;
52✔
3296
            break;
52✔
3297

3298
        case TSDB_DATA_TYPE_TINYINT:
14✔
3299
        case TSDB_DATA_TYPE_UTINYINT:
3300
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
14✔
3301
            assert(tmpP);
14!
3302
            tmfree(*data);
14✔
3303
            *data = (void*)tmpP;
14✔
3304
            break;
14✔
3305

3306
        case TSDB_DATA_TYPE_SMALLINT:
14✔
3307
        case TSDB_DATA_TYPE_USMALLINT:
3308
            tmpP = calloc(1, sizeof(int16_t) * g_arguments->prepared_rand);
14✔
3309
            assert(tmpP);
14!
3310
            tmfree(*data);
14✔
3311
            *data = (void*)tmpP;
14✔
3312
            break;
14✔
3313

3314
        case TSDB_DATA_TYPE_BIGINT:
16✔
3315
        case TSDB_DATA_TYPE_UBIGINT:
3316
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
16✔
3317
            assert(tmpP);
16!
3318
            tmfree(*data);
16✔
3319
            *data = (void*)tmpP;
16✔
3320
            break;
16✔
3321

3322
        case TSDB_DATA_TYPE_BOOL:
7✔
3323
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
7✔
3324
            assert(tmpP);
7!
3325
            tmfree(*data);
7✔
3326
            *data = (void*)tmpP;
7✔
3327
            break;
7✔
3328

3329
        case TSDB_DATA_TYPE_FLOAT:
181✔
3330
            tmpP = calloc(1, sizeof(float) * g_arguments->prepared_rand);
181✔
3331
            assert(tmpP);
181!
3332
            tmfree(*data);
181✔
3333
            *data = (void*)tmpP;
181✔
3334
            break;
181✔
3335

3336
        case TSDB_DATA_TYPE_DOUBLE:
49✔
3337
            tmpP = calloc(1, sizeof(double) * g_arguments->prepared_rand);
49✔
3338
            assert(tmpP);
49!
3339
            tmfree(*data);
49✔
3340
            *data = (void*)tmpP;
49✔
3341
            break;
49✔
3342

3343
        case TSDB_DATA_TYPE_BINARY:
170✔
3344
        case TSDB_DATA_TYPE_NCHAR:
3345
        case TSDB_DATA_TYPE_VARBINARY:
3346
        case TSDB_DATA_TYPE_GEOMETRY:
3347
            tmpP = calloc(1, g_arguments->prepared_rand * length);
170✔
3348
            assert(tmpP);
170!
3349
            tmfree(*data);
170✔
3350
            *data = (void*)tmpP;
170✔
3351
            break;
170✔
3352

3353
        case TSDB_DATA_TYPE_TIMESTAMP:
2✔
3354
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
2✔
3355
            assert(tmpP);
2!
3356
            tmfree(*data);
2✔
3357
            *data = (void*)tmpP;
2✔
3358
            break;
2✔
3359

3360
        case TSDB_DATA_TYPE_DECIMAL:
×
3361
        case TSDB_DATA_TYPE_DECIMAL64:
3362
            errorPrint("Not implemented data type in func initStmtData: %s\n",
×
3363
                       convertDatatypeToString(dataType));
3364
            exit(EXIT_FAILURE);
×
3365

3366
        case TSDB_DATA_TYPE_BLOB: {
×
3367
            tmpP = calloc(1, g_arguments->prepared_rand * length);
×
3368
            assert(tmpP);
×
3369
            tmfree(*data);
×
3370
            *data = (void *)tmpP;
×
3371
            break;
×
3372
        }
3373
        default:
×
3374

3375
            errorPrint("Unknown data type on initStmtData: %s\n",
×
3376
                       convertDatatypeToString(dataType));
3377
            exit(EXIT_FAILURE);
×
3378
    }
3379
}
505✔
3380

3381
static int parseBufferToStmtBatchChildTbl(SSuperTable *stbInfo,
8✔
3382
                                          SChildTable* childTbl, uint64_t *bind_ts_array) {
3383
    int32_t columnCount = stbInfo->cols->size;
8✔
3384

3385
    for (int c = 0; c < columnCount; c++) {
28✔
3386
        Field *col = benchArrayGet(stbInfo->cols, c);
20✔
3387
        ChildField *childCol = benchArrayGet(childTbl->childCols, c);
20✔
3388
        char dataType = col->type;
20✔
3389

3390
        // malloc memory
3391
        tmfree(childCol->stmtData.is_null);
20✔
3392
        tmfree(childCol->stmtData.lengths);
20✔
3393
        childCol->stmtData.is_null = benchCalloc(sizeof(char),     g_arguments->prepared_rand, true);
20✔
3394
        childCol->stmtData.lengths = benchCalloc(sizeof(int32_t),  g_arguments->prepared_rand, true);
20✔
3395

3396
        initStmtData(dataType, &(childCol->stmtData.data), col->length);
20✔
3397
    }
3398

3399
    return initStmtDataValue(stbInfo, childTbl, bind_ts_array);
8✔
3400
}
3401

3402
static int parseBufferToStmtBatch(SSuperTable* stbInfo, uint64_t *bind_ts_array) {
44✔
3403
    int32_t columnCount = stbInfo->cols->size;
44✔
3404

3405
    for (int c = 0; c < columnCount; c++) {
529✔
3406
        Field *col = benchArrayGet(stbInfo->cols, c);
485✔
3407

3408
        //remalloc element count is g_arguments->prepared_rand buffer
3409
        tmfree(col->stmtData.is_null);
485✔
3410
        col->stmtData.is_null = benchCalloc(sizeof(char), g_arguments->prepared_rand, false);
485✔
3411
        tmfree(col->stmtData.lengths);
485✔
3412
        col->stmtData.lengths = benchCalloc(sizeof(int32_t), g_arguments->prepared_rand, false);
485✔
3413

3414
        initStmtData(col->type, &(col->stmtData.data), col->length);
485✔
3415
    }
3416

3417
    return initStmtDataValue(stbInfo, NULL, bind_ts_array);
44✔
3418
}
3419

3420
static int64_t fillChildTblNameByCount(SSuperTable *stbInfo) {
261✔
3421
    if (stbInfo->useTagTableName) {
261✔
3422
        return stbInfo->childTblCount;
1✔
3423
    }
3424

3425
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
49,331✔
3426
        char childName[TSDB_TABLE_NAME_LEN]={0};
49,071✔
3427
        snprintf(childName,
49,071✔
3428
                 TSDB_TABLE_NAME_LEN,
3429
                 "%s%" PRIu64,
3430
                 stbInfo->childTblPrefix, i);
3431
        stbInfo->childTblArray[i]->name = strdup(childName);
49,071✔
3432
        debugPrint("%s(): %s\n", __func__,
49,071✔
3433
                  stbInfo->childTblArray[i]->name);
3434
    }
3435

3436
    return stbInfo->childTblCount;
260✔
3437
}
3438

3439
static int64_t fillChildTblNameByFromTo(SDataBase *database,
3✔
3440
        SSuperTable* stbInfo) {
3441
    for (int64_t i = stbInfo->childTblFrom; i <= stbInfo->childTblTo; i++) {
13✔
3442
        char childName[TSDB_TABLE_NAME_LEN]={0};
10✔
3443
        snprintf(childName,
10✔
3444
                TSDB_TABLE_NAME_LEN,
3445
                "%s%" PRIu64,
3446
                stbInfo->childTblPrefix, i);
3447
        stbInfo->childTblArray[i]->name = strdup(childName);
10✔
3448
    }
3449

3450
    return (stbInfo->childTblTo-stbInfo->childTblFrom);
3✔
3451
}
3452

3453
static int64_t fillChildTblNameByLimitOffset(SDataBase *database,
5✔
3454
        SSuperTable* stbInfo) {
3455
    SBenchConn* conn = initBenchConn(database->dbName);
5✔
3456
    if (NULL == conn) {
5!
3457
        return -1;
×
3458
    }
3459
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
5✔
3460
    if (g_arguments->taosc_version == 3) {
5!
3461
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
5✔
3462
                 "SELECT DISTINCT(TBNAME) FROM %s.`%s` LIMIT %" PRId64
3463
                 " OFFSET %" PRIu64,
3464
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
3465
                 stbInfo->childTblOffset);
3466
    } else {
3467
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
3468
                 "SELECT TBNAME FROM %s.`%s` LIMIT %" PRId64
3469
                 " OFFSET %" PRIu64,
3470
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
3471
                 stbInfo->childTblOffset);
3472
    }
3473
    debugPrint("cmd: %s\n", cmd);
5!
3474
    TAOS_RES *res = taos_query(conn->taos, cmd);
5✔
3475
    int32_t   code = taos_errno(res);
5✔
3476
    int64_t   count = 0;
5✔
3477
    if (code) {
5✔
3478
        printErrCmdCodeStr(cmd, code, res);
3✔
3479
        closeBenchConn(conn);
3✔
3480
        return -1;
3✔
3481
    }
3482
    TAOS_ROW row = NULL;
2✔
3483
    while ((row = taos_fetch_row(res)) != NULL) {
7✔
3484
        int *lengths = taos_fetch_lengths(res);
5✔
3485
        char * childName = benchCalloc(1, lengths[0] + 1, true);
5✔
3486
        strncpy(childName, row[0], lengths[0]);
5✔
3487
        childName[lengths[0]] = '\0';
5✔
3488
        stbInfo->childTblArray[count]->name = childName;
5✔
3489
        debugPrint("stbInfo->childTblArray[%" PRId64 "]->name: %s\n",
5!
3490
                   count, stbInfo->childTblArray[count]->name);
3491
        count++;
5✔
3492
    }
3493
    taos_free_result(res);
2✔
3494
    closeBenchConn(conn);
2✔
3495
    return count;
2✔
3496
}
3497

3498
static void preProcessArgument(SSuperTable *stbInfo) {
256✔
3499
    if (stbInfo->interlaceRows > g_arguments->reqPerReq) {
256✔
3500
        infoPrint(
6✔
3501
            "interlaceRows(%d) is larger than record per request(%u), which "
3502
            "will be set to %u\n",
3503
            stbInfo->interlaceRows, g_arguments->reqPerReq,
3504
            g_arguments->reqPerReq);
3505
        stbInfo->interlaceRows = g_arguments->reqPerReq;
6✔
3506
    }
3507

3508
    if (stbInfo->interlaceRows > stbInfo->insertRows) {
256✔
3509
        infoPrint(
1✔
3510
                "interlaceRows larger than insertRows %d > %" PRId64 "\n",
3511
                stbInfo->interlaceRows, stbInfo->insertRows);
3512
        infoPrint("%s", "interlaceRows will be set to 0\n");
1✔
3513
        stbInfo->interlaceRows = 0;
1✔
3514
    }
3515

3516
    if (stbInfo->interlaceRows == 0
256✔
3517
            && g_arguments->reqPerReq > stbInfo->insertRows) {
225✔
3518
        infoPrint("record per request (%u) is larger than "
112✔
3519
                "insert rows (%"PRIu64")"
3520
                " in progressive mode, which will be set to %"PRIu64"\n",
3521
                g_arguments->reqPerReq, stbInfo->insertRows,
3522
                stbInfo->insertRows);
3523
        g_arguments->reqPerReq = stbInfo->insertRows;
112✔
3524
    }
3525

3526
    if (stbInfo->interlaceRows > 0 && stbInfo->iface == STMT_IFACE
256✔
3527
            && stbInfo->autoTblCreating) {
8!
3528
        errorPrint("%s","stmt not support autocreate table with interlace row , quit programe!\n");
×
3529
        exit(-1);
×
3530
    }
3531
}
256✔
3532

3533
static int printTotalDelay(SDataBase *database,
256✔
3534
                           int64_t totalDelay,
3535
                           int64_t totalDelay1,
3536
                           int64_t totalDelay2,
3537
                           int64_t totalDelay3,
3538
                           BArray *total_delay_list,
3539
                            int threads,
3540
                            int64_t totalInsertRows,
3541
                            int64_t spend) {
3542
    // zero check
3543
    if (total_delay_list->size == 0 || spend == 0 || threads == 0) {
256!
3544
        return -1;
1✔
3545
    }
3546

3547
    char subDelay[128] = "";
255✔
3548
    if(totalDelay1 + totalDelay2 + totalDelay3 > 0) {
255✔
3549
        sprintf(subDelay, " stmt delay1=%.2fs delay2=%.2fs delay3=%.2fs",
44✔
3550
                totalDelay1/threads/1E6,
44✔
3551
                totalDelay2/threads/1E6,
44✔
3552
                totalDelay3/threads/1E6);
44✔
3553
    }
3554

3555
    double time_cost = spend / 1E6;
255✔
3556
    double real_time_cost = totalDelay/threads/1E6;
255✔
3557
    double records_per_second = (double)(totalInsertRows / (spend/1E6));
255✔
3558
    double real_records_per_second = (double)(totalInsertRows / (totalDelay/threads/1E6));
255✔
3559

3560
    succPrint("Spent %.6f (real %.6f) seconds to insert rows: %" PRIu64
255!
3561
              " with %d thread(s) into %s %.2f (real %.2f) records/second%s\n",
3562
              time_cost, real_time_cost, totalInsertRows, threads,
3563
              database->dbName, records_per_second,
3564
              real_records_per_second, subDelay);
3565

3566
    if (!total_delay_list->size) {
255!
3567
        return -1;
×
3568
    }
3569
    
3570
    double minDelay = *(int64_t *)(benchArrayGet(total_delay_list, 0))/1E3;
255✔
3571
    double avgDelay = (double)totalDelay/total_delay_list->size/1E3;
255✔
3572
    double p90 = *(int64_t *)(benchArrayGet(total_delay_list,
510✔
3573
                                         (int32_t)(total_delay_list->size
255✔
3574
                                         * 0.9)))/1E3;
255✔
3575
    double p95 = *(int64_t *)(benchArrayGet(total_delay_list,
510✔
3576
                                         (int32_t)(total_delay_list->size
255✔
3577
                                         * 0.95)))/1E3;
255✔
3578
    double p99 = *(int64_t *)(benchArrayGet(total_delay_list,
510✔
3579
                                         (int32_t)(total_delay_list->size
255✔
3580
                                         * 0.99)))/1E3;
255✔
3581
    double maxDelay = *(int64_t *)(benchArrayGet(total_delay_list,
510✔
3582
                                         (int32_t)(total_delay_list->size
255✔
3583
                                         - 1)))/1E3;                                     
255✔
3584

3585
    succPrint("insert delay, "
255!
3586
              "min: %.4fms, "
3587
              "avg: %.4fms, "
3588
              "p90: %.4fms, "
3589
              "p95: %.4fms, "
3590
              "p99: %.4fms, "
3591
              "max: %.4fms\n",
3592
            minDelay, avgDelay, p90, p95, p99, maxDelay);
3593
    
3594
    if (g_arguments->output_json_file) {
255!
3595
        tools_cJSON *root = tools_cJSON_CreateObject();
×
3596
        if (root) {
×
3597
            tools_cJSON_AddStringToObject(root, "db_name", database->dbName);
×
3598
            tools_cJSON_AddNumberToObject(root, "inserted_rows", totalInsertRows);
×
3599
            tools_cJSON_AddNumberToObject(root, "threads", threads);
×
3600
            tools_cJSON_AddNumberToObject(root, "time_cost", time_cost);
×
3601
            tools_cJSON_AddNumberToObject(root, "real_time_cost", real_time_cost);
×
3602
            tools_cJSON_AddNumberToObject(root, "records_per_second",  records_per_second);
×
3603
            tools_cJSON_AddNumberToObject(root, "real_records_per_second", real_records_per_second);
×
3604
            
3605
            tools_cJSON_AddNumberToObject(root, "avg", avgDelay);
×
3606
            tools_cJSON_AddNumberToObject(root, "min", minDelay);
×
3607
            tools_cJSON_AddNumberToObject(root, "max", maxDelay);
×
3608
            tools_cJSON_AddNumberToObject(root, "p90", p90);
×
3609
            tools_cJSON_AddNumberToObject(root, "p95", p95);
×
3610
            tools_cJSON_AddNumberToObject(root, "p99", p99);
×
3611
            
3612
            char *jsonStr = tools_cJSON_PrintUnformatted(root);
×
3613
            if (jsonStr) {
×
3614
                FILE *fp = fopen(g_arguments->output_json_file, "w");
×
3615
                if (fp) {
×
3616
                    fprintf(fp, "%s\n", jsonStr);
×
3617
                    fclose(fp);
×
3618
                } else {
3619
                    errorPrint("Failed to open output JSON file, file name %s\n",
×
3620
                            g_arguments->output_json_file);
3621
                }
3622
                free(jsonStr);
×
3623
            }
3624
            tools_cJSON_Delete(root);
×
3625
        }
3626
    }        
3627
    return 0;
255✔
3628
}
3629

3630
static int64_t fillChildTblNameImp(SDataBase *database, SSuperTable *stbInfo) {
11✔
3631
    int64_t ntables;
3632
    if (stbInfo->childTblLimit) {
11✔
3633
        ntables = fillChildTblNameByLimitOffset(database, stbInfo);
5✔
3634
    } else if (stbInfo->childTblFrom || stbInfo->childTblTo) {
6✔
3635
        ntables = fillChildTblNameByFromTo(database, stbInfo);
3✔
3636
    } else {
3637
        ntables = fillChildTblNameByCount(stbInfo);
3✔
3638
    }
3639
    return ntables;
11✔
3640
}
3641

3642
static int64_t fillChildTblName(SDataBase *database, SSuperTable *stbInfo) {
273✔
3643
    int64_t ntables = stbInfo->childTblCount;
273✔
3644
    stbInfo->childTblArray = benchCalloc(stbInfo->childTblCount,
273✔
3645
            sizeof(SChildTable*), true);
3646
    for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
49,438✔
3647
        stbInfo->childTblArray[child] =
49,165✔
3648
            benchCalloc(1, sizeof(SChildTable), true);
49,165✔
3649
    }
3650

3651
    if (stbInfo->childTblCount == 1 && stbInfo->tags->size == 0) {
273✔
3652
        // Normal table
3653
        char childName[TSDB_TABLE_NAME_LEN]={0};
4✔
3654
        snprintf(childName, TSDB_TABLE_NAME_LEN,
4✔
3655
                    "%s", stbInfo->stbName);
3656
        stbInfo->childTblArray[0]->name = strdup(childName);
4✔
3657
    } else if ((stbInfo->iface != SML_IFACE
269✔
3658
        && stbInfo->iface != SML_REST_IFACE)
226✔
3659
            && stbInfo->childTblExists) {
218✔
3660
        ntables = fillChildTblNameImp(database, stbInfo);
11✔
3661
    } else {
3662
        ntables = fillChildTblNameByCount(stbInfo);
258✔
3663
    }
3664

3665
    return ntables;
273✔
3666
}
3667

3668
// last ts fill to filllBackTime
3669
static bool fillSTableLastTs(SDataBase *database, SSuperTable *stbInfo) {
×
3670
    SBenchConn* conn = initBenchConn(database->dbName);
×
3671
    if (NULL == conn) {
×
3672
        return false;
×
3673
    }
3674
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
3675
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "select last(ts) from %s.`%s`", database->dbName, stbInfo->stbName);
×
3676

3677
    infoPrint("fillBackTime: %s\n", cmd);
×
3678
    TAOS_RES *res = taos_query(conn->taos, cmd);
×
3679
    int32_t   code = taos_errno(res);
×
3680
    if (code) {
×
3681
        printErrCmdCodeStr(cmd, code, res);
×
3682
        closeBenchConn(conn);
×
3683
        return false;
×
3684
    }
3685

3686
    TAOS_ROW row = taos_fetch_row(res);
×
3687
    if(row == NULL) {
×
3688
        taos_free_result(res);
×
3689
        closeBenchConn(conn);
×
3690
        return false;
×
3691
    }
3692
    
3693
    char lastTs[128];
3694
    memset(lastTs, 0, sizeof(lastTs));
×
3695

3696
    stbInfo->startFillbackTime = *(int64_t*)row[0];
×
3697
    toolsFormatTimestamp(lastTs, stbInfo->startFillbackTime, database->precision);
×
3698
    infoPrint("fillBackTime: get ok %s.%s last ts=%s \n", database->dbName, stbInfo->stbName, lastTs);
×
3699
    
3700
    taos_free_result(res);
×
3701
    closeBenchConn(conn);
×
3702

3703
    return true;
×
3704
}
3705

3706
// calcNow expression fill to timestamp_start
3707
static bool calcExprFromServer(SDataBase *database, SSuperTable *stbInfo) {
5✔
3708
    SBenchConn* conn = initBenchConn(database->dbName);
5✔
3709
    if (NULL == conn) {
5!
3710
        return false;
×
3711
    }
3712
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
5✔
3713
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "select %s", stbInfo->calcNow);
5✔
3714

3715
    infoPrint("calcExprFromServer: %s\n", cmd);
5✔
3716
    TAOS_RES *res = taos_query(conn->taos, cmd);
5✔
3717
    int32_t   code = taos_errno(res);
5✔
3718
    if (code) {
5!
3719
        printErrCmdCodeStr(cmd, code, res);
×
3720
        closeBenchConn(conn);
×
3721
        return false;
×
3722
    }
3723

3724
    TAOS_ROW row = taos_fetch_row(res);
5✔
3725
    if(row == NULL) {
5!
3726
        taos_free_result(res);
×
3727
        closeBenchConn(conn);
×
3728
        return false;
×
3729
    }
3730
    
3731
    char ts[128];
3732
    memset(ts, 0, sizeof(ts));
5✔
3733

3734
    stbInfo->startTimestamp = *(int64_t*)row[0];
5✔
3735
    toolsFormatTimestamp(ts, stbInfo->startTimestamp, database->precision);
5✔
3736
    infoPrint("calcExprFromServer: get ok.  %s = %s \n", stbInfo->calcNow, ts);
5✔
3737
    
3738
    taos_free_result(res);
5✔
3739
    closeBenchConn(conn);
5✔
3740

3741
    return true;
5✔
3742
}
3743

3744
int64_t obtainTableCount(SDataBase* database, SSuperTable* stbInfo) {
256✔
3745
    // ntable calc
3746
    int64_t ntables;
3747
    if (stbInfo->childTblTo > 0) {
256✔
3748
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
4✔
3749
    } else if (stbInfo->childTblLimit > 0 && stbInfo->childTblExists) {
252✔
3750
        ntables = stbInfo->childTblLimit;
3✔
3751
    } else {
3752
        ntables = stbInfo->childTblCount;
249✔
3753
    }
3754

3755
    return ntables;
256✔
3756
}
3757

3758
// assign table to thread with vgroups, return assign thread count
3759
int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) {
2✔
3760
    int32_t threads = 0;
2✔
3761

3762
    // calc table count per vgroup
3763
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
42✔
3764
        int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
40✔
3765
        if (vgIdx == -1) {
40!
3766
            continue;
×
3767
        }
3768
        SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
40✔
3769
        vg->tbCountPerVgId ++;
40✔
3770
    }
3771

3772
    // malloc vg->childTblArray memory with table count
3773
    for (int v = 0; v < database->vgroups; v++) {
22✔
3774
        SVGroup *vg = benchArrayGet(database->vgArray, v);
20✔
3775
        infoPrint("Local hash calc %"PRId64" tables on %s's vgroup %d (id: %d)\n",
20✔
3776
                    vg->tbCountPerVgId, database->dbName, v, vg->vgId);
3777
        if (vg->tbCountPerVgId) {
20✔
3778
            threads++;
14✔
3779
        } else {
3780
            continue;
6✔
3781
        }
3782
        vg->childTblArray = benchCalloc(vg->tbCountPerVgId, sizeof(SChildTable*), true);
14✔
3783
        vg->tbOffset      = 0;
14✔
3784
    }
3785
    
3786
    // set vg->childTblArray data
3787
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
42✔
3788
        int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
40✔
3789
        if (vgIdx == -1) {
40!
3790
            continue;
×
3791
        }
3792
        SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
40✔
3793
        debugPrint("calc table hash to vgroup %s.%s vgIdx=%d\n",
40✔
3794
                    database->dbName,
3795
                    stbInfo->childTblArray[i]->name, vgIdx);
3796
        vg->childTblArray[vg->tbOffset] = stbInfo->childTblArray[i];
40✔
3797
        vg->tbOffset++;
40✔
3798
    }
3799
    return threads;
2✔
3800
}
3801

3802
// init stmt
3803
TAOS_STMT* initStmt(TAOS* taos, bool single) {
110✔
3804
    if (!single) {
110!
3805
        infoPrint("initStmt call taos_stmt_init single=%d\n", single);
×
3806
        return taos_stmt_init(taos);
×
3807
    }
3808

3809
    TAOS_STMT_OPTIONS op;
3810
    memset(&op, 0, sizeof(op));
110✔
3811
    op.singleStbInsert      = single;
110✔
3812
    op.singleTableBindOnce  = single;
110✔
3813
    infoPrint("initStmt call taos_stmt_init_with_options single=%d\n", single);
110✔
3814
    return taos_stmt_init_with_options(taos, &op);
110✔
3815
}
3816

3817
// init stmt2
3818
TAOS_STMT2* initStmt2(TAOS* taos, bool single) {
87✔
3819
    TAOS_STMT2_OPTION op2;
3820
    memset(&op2, 0, sizeof(op2));
87✔
3821
    op2.singleStbInsert      = single;
87✔
3822
    op2.singleTableBindOnce  = single;
87✔
3823
    
3824
    TAOS_STMT2* stmt2 = taos_stmt2_init(taos, &op2);
87✔
3825
    if (stmt2) 
87!
3826
        succPrint("succ  taos_stmt2_init single=%d\n", single);
87!
3827
    else
3828
        errorPrint("failed taos_stmt2_init single=%d\n", single);
×
3829
    return stmt2;
87✔
3830
}
3831

3832
// init insert thread
3833
void initTsArray(uint64_t *bind_ts_array, SSuperTable* stbInfo) {
44✔
3834
    parseBufferToStmtBatch(stbInfo, bind_ts_array);
44✔
3835
    for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
11,020✔
3836
        SChildTable *childTbl = stbInfo->childTblArray[child];
10,976✔
3837
        if (childTbl->useOwnSample) {
10,976✔
3838
            parseBufferToStmtBatchChildTbl(stbInfo, childTbl, bind_ts_array);
8✔
3839
        }
3840
    }
3841
    
3842
}
44✔
3843

3844
void *genInsertTheadInfo(void* arg) {
1,248✔
3845

3846
    if (g_arguments->terminate || g_fail) {
1,248!
3847
        return NULL;
×
3848
    }
3849

3850
    threadInfo* pThreadInfo = (threadInfo*)arg;
1,248✔
3851
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
1,248✔
3852
    pThreadInfo->delayList = benchArrayInit(1, sizeof(int64_t));
1,248✔
3853
    switch (stbInfo->iface) {
1,248!
3854
        // rest
3855
        case REST_IFACE: {
4✔
3856
            if (stbInfo->interlaceRows > 0) {
4!
3857
                pThreadInfo->buffer = new_ds(0);
×
3858
            } else {
3859
                pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
4✔
3860
            }
3861
            int sockfd = createSockFd();
4✔
3862
            if (sockfd < 0) {
4!
3863
                g_fail = true;
×
3864
                goto END;
×
3865
            }
3866
            pThreadInfo->sockfd = sockfd;
4✔
3867
            break;
4✔
3868
        }            
3869
        // stmt & stmt2 init
3870
        case STMT_IFACE: 
197✔
3871
        case STMT2_IFACE: {
3872
            pThreadInfo->conn = initBenchConn(pThreadInfo->dbInfo->dbName);
197✔
3873
            if (NULL == pThreadInfo->conn) {
197!
3874
                goto END;
×
3875
            }
3876
            // single always true for benchmark
3877
            bool single = true;
197✔
3878
            if (stbInfo->iface == STMT2_IFACE) {
197✔
3879
                // stmt2 init
3880
                if (pThreadInfo->conn->stmt2)
87!
3881
                    taos_stmt2_close(pThreadInfo->conn->stmt2);
×
3882
                pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
87✔
3883
                if (NULL == pThreadInfo->conn->stmt2) {
87!
3884
                    errorPrint("taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
×
3885
                    g_fail = true;
×
3886
                    goto END;                    
×
3887
                }
3888
            } else {
3889
                // stmt init
3890
                if (pThreadInfo->conn->stmt)
110!
3891
                    taos_stmt_close(pThreadInfo->conn->stmt);
×
3892
                pThreadInfo->conn->stmt = initStmt(pThreadInfo->conn->taos, single);
110✔
3893
                if (NULL == pThreadInfo->conn->stmt) {
110!
3894
                    errorPrint("taos_stmt_init() failed, reason: %s\n", taos_errstr(NULL));
×
3895
                    g_fail = true;
×
3896
                    goto END;                    
×
3897
                }
3898
            }
3899

3900
            // select db
3901
            if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
197!
3902
                errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
3903
                g_fail = true;
×
3904
                goto END;
×
3905
            }
3906

3907
            // malloc bind
3908
            int32_t unit = stbInfo->iface == STMT2_IFACE ? sizeof(TAOS_STMT2_BIND) : sizeof(TAOS_MULTI_BIND);
197✔
3909
            pThreadInfo->bind_ts       = benchCalloc(1, sizeof(int64_t), true);
197✔
3910
            
3911
            pThreadInfo->bindParams    = benchCalloc(1, unit * (stbInfo->cols->size + 1), true);
195✔
3912
            // have ts columns, so size + 1
3913
            pThreadInfo->lengths       = benchCalloc(stbInfo->cols->size + 1, sizeof(int32_t*), true);
194✔
3914
            for(int32_t c = 0; c <= stbInfo->cols->size; c++) {
2,254✔
3915
                pThreadInfo->lengths[c] = benchCalloc(g_arguments->reqPerReq, sizeof(int32_t), true);
2,057✔
3916
            }
3917
            // tags data
3918
            pThreadInfo->tagsStmt = copyBArray(stbInfo->tags);
197✔
3919
            for(int32_t n = 0; n < pThreadInfo->tagsStmt->size; n ++ ) {
1,094✔
3920
                Field *field = benchArrayGet(pThreadInfo->tagsStmt, n);
897✔
3921
                memset(&field->stmtData, 0, sizeof(StmtData));
898✔
3922
            }
3923
        
3924
            break;
197✔
3925
        }
3926
        // sml rest
3927
        case SML_REST_IFACE: {
13✔
3928
            int sockfd = createSockFd();
13✔
3929
            if (sockfd < 0) {
13!
3930
                g_fail = true;
×
3931
                goto END;
×
3932
            }
3933
            pThreadInfo->sockfd = sockfd;
13✔
3934
        }            
3935
        // sml
3936
        case SML_IFACE: {
167✔
3937
            if (stbInfo->iface == SML_IFACE) {
167✔
3938
                pThreadInfo->conn = initBenchConn(pThreadInfo->dbInfo->dbName);
154✔
3939
                if (pThreadInfo->conn == NULL) {
154!
3940
                    errorPrint("%s() init connection failed\n", __func__);
×
3941
                    g_fail = true;
×
3942
                    goto END;
×
3943
                }
3944
                if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
154!
3945
                    errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
3946
                    g_fail = true;
×
3947
                    goto END;
×
3948
                }
3949
            }
3950
            pThreadInfo->max_sql_len = stbInfo->lenOfCols + stbInfo->lenOfTags;
162✔
3951
            if (stbInfo->iface == SML_REST_IFACE) {
162✔
3952
                pThreadInfo->buffer = benchCalloc(1, g_arguments->reqPerReq * (1 + pThreadInfo->max_sql_len), true);
13✔
3953
            }                
3954
            int protocol = stbInfo->lineProtocol;
162✔
3955
            if (TSDB_SML_JSON_PROTOCOL != protocol && SML_JSON_TAOS_FORMAT != protocol) {
674✔
3956
                pThreadInfo->sml_tags = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true);
92✔
3957
                for (int t = 0; t < pThreadInfo->ntables; t++) {
640✔
3958
                    pThreadInfo->sml_tags[t] = benchCalloc(1, stbInfo->lenOfTags, true);
549✔
3959
                }
3960
                int64_t index = pThreadInfo->start_table_from;
91✔
3961
                for (int t = 0; t < pThreadInfo->ntables; t++) {
642✔
3962
                    if (generateRandData(
551!
3963
                                stbInfo, pThreadInfo->sml_tags[t],
551✔
3964
                                stbInfo->lenOfTags,
551✔
3965
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
551✔
3966
                                stbInfo->tags, 1, true, NULL, index++)) {
3967
                        g_fail = true;            
×
3968
                        goto END;
×
3969
                    }
3970
                    debugPrint("pThreadInfo->sml_tags[%d]: %s\n", t,
551!
3971
                               pThreadInfo->sml_tags[t]);
3972
                }
3973
                pThreadInfo->lines = benchCalloc(g_arguments->reqPerReq, sizeof(char *), true);
91✔
3974
                for (int j = 0; (j < g_arguments->reqPerReq && !g_arguments->terminate); j++) {
99,438✔
3975
                    pThreadInfo->lines[j] = benchCalloc(1, pThreadInfo->max_sql_len, true);
98,926✔
3976
                }
3977
            } else {
3978
                pThreadInfo->json_array          = tools_cJSON_CreateArray();
70✔
3979
                pThreadInfo->sml_json_tags       = tools_cJSON_CreateArray();
73✔
3980
                pThreadInfo->sml_tags_json_array = (char **)benchCalloc( pThreadInfo->ntables, sizeof(char *), true);
74✔
3981
                for (int t = 0; t < pThreadInfo->ntables; t++) {
219✔
3982
                    if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
144✔
3983
                        generateSmlJsonTags(
72✔
3984
                            pThreadInfo->sml_json_tags,
3985
                                pThreadInfo->sml_tags_json_array,
3986
                                stbInfo,
3987
                            pThreadInfo->start_table_from, t);
3988
                    } else {
3989
                        generateSmlTaosJsonTags(
72✔
3990
                            pThreadInfo->sml_json_tags, stbInfo,
3991
                            pThreadInfo->start_table_from, t);
3992
                    }
3993
                }
3994
                pThreadInfo->lines = (char **)benchCalloc(1, sizeof(char *), true);
75✔
3995
                if (0 == stbInfo->interlaceRows && TSDB_SML_JSON_PROTOCOL == protocol) {
75!
3996
                    pThreadInfo->line_buf_len = g_arguments->reqPerReq * accumulateRowLen(pThreadInfo->stbInfo->tags, pThreadInfo->stbInfo->iface);
38✔
3997
                    debugPrint("%s() LN%d, line_buf_len=%d\n", __func__, __LINE__, pThreadInfo->line_buf_len);
38!
3998
                    pThreadInfo->lines[0]             = benchCalloc(1, pThreadInfo->line_buf_len, true);
38✔
3999
                    pThreadInfo->sml_json_value_array = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true);
38✔
4000
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
×
4001
                        generateSmlJsonValues(pThreadInfo->sml_json_value_array, stbInfo, t);
74✔
4002
                    }
4003
                }
4004
            }
4005
            break;
167✔
4006
        }
4007
        // taos
4008
        case TAOSC_IFACE: {
880✔
4009
            pThreadInfo->conn = initBenchConn(pThreadInfo->dbInfo->dbName);
880✔
4010
            if (pThreadInfo->conn == NULL) {
880!
4011
                errorPrint("%s() failed to connect\n", __func__);
×
4012
                g_fail = true;
×
4013
                goto END;
×
4014
            }
4015
            char* command = benchCalloc(1, SHORT_1K_SQL_BUFF_LEN, false);
880✔
4016
            snprintf(command, SHORT_1K_SQL_BUFF_LEN,
880✔
4017
                    g_arguments->escape_character ? "USE `%s`" : "USE %s",
880✔
4018
                    pThreadInfo->dbInfo->dbName);
880✔
4019
            if (queryDbExecCall(pThreadInfo->conn, command)) {
880!
4020
                errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
4021
                g_fail = true;
×
4022
                goto END;
×
4023
            }
4024
            tmfree(command);
876✔
4025
            command = NULL;
878✔
4026

4027
            if (stbInfo->interlaceRows > 0) {
878✔
4028
                pThreadInfo->buffer = new_ds(0);
37✔
4029
            } else {
4030
                pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
841✔
4031
                if (g_arguments->check_sql) {
843✔
4032
                    pThreadInfo->csql = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
20✔
4033
                    memset(pThreadInfo->csql, 0, TSDB_MAX_ALLOWED_SQL_LEN);
20✔
4034
                }
4035
            }
4036

4037
            break;
880✔
4038
        }
4039
        default:
×
4040
            break;
×
4041
    }
4042

4043
END:
1,248✔
4044
    return NULL;
1,248✔
4045
}
4046
    
4047

4048
// init insert thread
4049
int32_t initInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, int64_t div, int64_t mod) {
256✔
4050
    int32_t  ret     = -1;
256✔
4051
    uint64_t tbNext  = stbInfo->childTblFrom;
256✔
4052
    int32_t  vgNext  = 0;
256✔
4053
    pthread_t   *pids  = benchCalloc(1, nthreads * sizeof(pthread_t),  true);
256✔
4054
    int threadCnt = 0;
256✔
4055
    uint64_t * bind_ts_array = NULL;
256✔
4056
    if (STMT2_IFACE == stbInfo->iface || STMT_IFACE == stbInfo->iface) {
256✔
4057
        bind_ts_array = benchCalloc(1, sizeof(int64_t)*g_arguments->prepared_rand, true);
44✔
4058
        initTsArray(bind_ts_array, stbInfo);
44✔
4059
    }
4060
    
4061

4062
    for (int32_t i = 0; i < nthreads && !g_arguments->terminate; i++) {
1,504!
4063
        // set table
4064
        threadInfo *pThreadInfo = infos + i;
1,248✔
4065
        pThreadInfo->threadID   = i;
1,248✔
4066
        pThreadInfo->dbInfo     = database;
1,248✔
4067
        pThreadInfo->stbInfo    = stbInfo;
1,248✔
4068
        pThreadInfo->start_time = stbInfo->startTimestamp;
1,248✔
4069
        pThreadInfo->pos        = 0;
1,248✔
4070
        pThreadInfo->samplePos  = 0;
1,248✔
4071
        pThreadInfo->totalInsertRows = 0;
1,248✔
4072
        if (STMT2_IFACE == stbInfo->iface || STMT_IFACE == stbInfo->iface) {
1,248✔
4073
            pThreadInfo->bind_ts_array = benchCalloc(1, sizeof(int64_t)*g_arguments->prepared_rand, true);
197✔
4074
            memcpy(pThreadInfo->bind_ts_array, bind_ts_array, sizeof(int64_t)*g_arguments->prepared_rand);
197✔
4075
            
4076
        }
4077
        if (g_arguments->bind_vgroup) {
1,248✔
4078
            for (int32_t j = vgNext; j < database->vgroups; j++) {
20!
4079
                SVGroup *vg = benchArrayGet(database->vgArray, j);
20✔
4080
                if (0 == vg->tbCountPerVgId) {
20✔
4081
                    continue;
6✔
4082
                }
4083
                pThreadInfo->vg               = vg;
14✔
4084
                pThreadInfo->ntables          = vg->tbCountPerVgId;
14✔
4085
                pThreadInfo->start_table_from = 0;
14✔
4086
                pThreadInfo->end_table_to     = vg->tbCountPerVgId - 1;
14✔
4087
                vgNext                        = j + 1;
14✔
4088
                break;
14✔
4089
            }            
4090
        } else {
4091
            pThreadInfo->start_table_from = tbNext;
1,234✔
4092
            pThreadInfo->ntables          = i < mod ? div + 1 : div;
1,234✔
4093
            pThreadInfo->end_table_to     = i < mod ? tbNext + div : tbNext + div - 1;
1,234✔
4094
            tbNext                        = pThreadInfo->end_table_to + 1;
1,234✔
4095
        }
4096

4097
        // init conn
4098
        pthread_create(pids + i, NULL, genInsertTheadInfo,   pThreadInfo);
1,248✔
4099
        threadCnt ++;
1,248✔
4100
    }
4101
    
4102
    // wait threads
4103
    for (int i = 0; i < threadCnt; i++) {
1,504✔
4104
        infoPrint("init pthread_join %d ...\n", i);
1,248✔
4105
        pthread_join(pids[i], NULL);
1,248✔
4106
    }
4107

4108
    if (bind_ts_array) {
256✔
4109
        tmfree(bind_ts_array);
44✔
4110
    }
4111
    
4112
    tmfree(pids);
256✔
4113
    if (g_fail) {
256!
4114
       return -1;
×
4115
    }
4116
    return 0;
256✔
4117
}
4118

4119
#ifdef LINUX
4120
#define EMPTY_SLOT -1
4121
// run with limit thread
4122
int32_t runInsertLimitThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, int32_t limitThread, threadInfo *infos, pthread_t *pids) {
1✔
4123
    infoPrint("run with bind vgroups limit thread. limit threads=%d nthread=%d\n", limitThread, nthreads);
1✔
4124
    
4125
    // slots save threadInfo array index
4126
    int32_t* slot = benchCalloc(limitThread, sizeof(int32_t), false); 
1✔
4127
    int32_t  t    = 0; // thread index
1✔
4128
    for (int32_t i = 0; i < limitThread; i++) {
3✔
4129
        slot[i] = EMPTY_SLOT;
2✔
4130
    }
4131

4132
    while (!g_arguments->terminate) {
7!
4133
        int32_t emptySlot = 0;
7✔
4134
        for (int32_t i = 0; i < limitThread; i++) {
21✔
4135
            int32_t idx = slot[i];
14✔
4136
            // check slot thread end
4137
            if(idx != EMPTY_SLOT) {
14✔
4138
                if (pthread_tryjoin_np(pids[idx], NULL) == EBUSY ) {
11✔
4139
                    // thread is running
4140
                    toolsMsleep(2000);
4✔
4141
                } else {
4142
                    // thread is end , set slot empty 
4143
                    infoPrint("slot[%d] finished tidx=%d. completed thread count=%d\n", i, slot[i], t);
7✔
4144
                    slot[i] = EMPTY_SLOT;
7✔
4145
                }
4146
            } 
4147

4148
            if (slot[i] == EMPTY_SLOT && t < nthreads) {
14✔
4149
                // slot is empty , set new thread to running
4150
                threadInfo *pThreadInfo = infos + t;
7✔
4151
                if (stbInfo->interlaceRows > 0) {
7!
4152
                    pthread_create(pids + t, NULL, syncWriteInterlace,   pThreadInfo);
×
4153
                } else {
4154
                    pthread_create(pids + t, NULL, syncWriteProgressive, pThreadInfo);
7✔
4155
                }
4156
                
4157
                // save current and move next
4158
                slot[i] = t;
7✔
4159
                t++;
7✔
4160
                infoPrint("slot[%d] start new thread tidx=%d. \n", i, slot[i]);
7✔
4161
            }
4162

4163
            // check slot empty
4164
            if(slot[i] == EMPTY_SLOT) {
14✔
4165
                emptySlot++;
3✔
4166
            }
4167
        }
4168

4169
        // check all thread end
4170
        if(emptySlot == limitThread) {
7✔
4171
            debugPrint("all threads(%d) is run finished.\n", nthreads);
1!
4172
            break;
1✔
4173
        } else {
4174
            debugPrint("current thread index=%d all thread=%d\n", t, nthreads);
6!
4175
        }
4176
    }
4177

4178
    tmfree(slot);
1✔
4179

4180
    return 0;
1✔
4181
}
4182
#endif
4183

4184
// run
4185
int32_t runInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids) {
255✔
4186
    infoPrint("run insert thread. real nthread=%d\n", nthreads);
255✔
4187
    // create threads
4188
    int threadCnt = 0;
255✔
4189
    for (int i = 0; i < nthreads && !g_arguments->terminate; i++) {
1,496!
4190
        threadInfo *pThreadInfo = infos + i;
1,241✔
4191
        if (stbInfo->interlaceRows > 0) {
1,241✔
4192
            pthread_create(pids + i, NULL, syncWriteInterlace,   pThreadInfo);
107✔
4193
        } else {
4194
            pthread_create(pids + i, NULL, syncWriteProgressive, pThreadInfo);
1,134✔
4195
        }
4196
        threadCnt ++;
1,241✔
4197
    }    
4198

4199
    // wait threads
4200
    for (int i = 0; i < threadCnt; i++) {
1,496✔
4201
        infoPrint("pthread_join %d ...\n", i);
1,241✔
4202
        pthread_join(pids[i], NULL);
1,241✔
4203
    }
4204

4205
    return 0;
255✔
4206
}
4207

4208

4209
// exit and free resource
4210
int32_t exitInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids, int64_t spend) {
256✔
4211

4212
    if (g_arguments->terminate)  toolsMsleep(100);
256!
4213

4214
    BArray *  total_delay_list = benchArrayInit(1, sizeof(int64_t));
256✔
4215
    int64_t   totalDelay = 0;
256✔
4216
    int64_t   totalDelay1 = 0;
256✔
4217
    int64_t   totalDelay2 = 0;
256✔
4218
    int64_t   totalDelay3 = 0;
256✔
4219
    uint64_t  totalInsertRows = 0;
256✔
4220

4221
    // free threads resource
4222
    for (int i = 0; i < nthreads; i++) {
1,504✔
4223
        threadInfo *pThreadInfo = infos + i;
1,248✔
4224
        // free check sql
4225
        if (pThreadInfo->csql) {
1,248✔
4226
            tmfree(pThreadInfo->csql);
20✔
4227
            pThreadInfo->csql = NULL;
20✔
4228
        }
4229

4230
        // close conn
4231
        int protocol = stbInfo->lineProtocol;
1,248✔
4232
        switch (stbInfo->iface) {
1,248!
4233
            case REST_IFACE:
4✔
4234
                if (g_arguments->terminate)
4!
4235
                    toolsMsleep(100);
×
4236
                destroySockFd(pThreadInfo->sockfd);
4✔
4237
                if (stbInfo->interlaceRows > 0) {
4!
4238
                    free_ds(&pThreadInfo->buffer);
×
4239
                } else {
4240
                    tmfree(pThreadInfo->buffer);
4✔
4241
                    pThreadInfo->buffer = NULL;
4✔
4242
                }
4243
                break;
4✔
4244
            case SML_REST_IFACE:
13✔
4245
                if (g_arguments->terminate)
13!
4246
                    toolsMsleep(100);
×
4247
                tmfree(pThreadInfo->buffer);
13✔
4248
                // on-purpose no break here            
4249
            case SML_IFACE:
167✔
4250
                if (TSDB_SML_JSON_PROTOCOL != protocol
167✔
4251
                        && SML_JSON_TAOS_FORMAT != protocol) {
129✔
4252
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
644✔
4253
                        tmfree(pThreadInfo->sml_tags[t]);
552✔
4254
                    }
4255
                    for (int j = 0; j < g_arguments->reqPerReq; j++) {
100,867✔
4256
                        tmfree(pThreadInfo->lines[j]);
100,775✔
4257
                    }
4258
                    tmfree(pThreadInfo->sml_tags);
92✔
4259
                    pThreadInfo->sml_tags = NULL;
92✔
4260
                } else {
4261
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
222✔
4262
                        tmfree(pThreadInfo->sml_tags_json_array[t]);
147✔
4263
                    }
4264
                    tmfree(pThreadInfo->sml_tags_json_array);
75✔
4265
                    pThreadInfo->sml_tags_json_array = NULL;
75✔
4266
                    if (pThreadInfo->sml_json_tags) {
75!
4267
                        tools_cJSON_Delete(pThreadInfo->sml_json_tags);
75✔
4268
                        pThreadInfo->sml_json_tags = NULL;
75✔
4269
                    }
4270
                    if (pThreadInfo->json_array) {
75!
4271
                        tools_cJSON_Delete(pThreadInfo->json_array);
×
4272
                        pThreadInfo->json_array = NULL;
×
4273
                    }
4274
                }
4275
                if (pThreadInfo->lines) {
167!
4276
                    if ((0 == stbInfo->interlaceRows)
167✔
4277
                            && (TSDB_SML_JSON_PROTOCOL == protocol)) {
147✔
4278
                        tmfree(pThreadInfo->lines[0]);
38✔
4279
                        for (int t = 0; t < pThreadInfo->ntables; t++) {
112✔
4280
                            tmfree(pThreadInfo->sml_json_value_array[t]);
74✔
4281
                        }
4282
                        tmfree(pThreadInfo->sml_json_value_array);
38✔
4283
                    }
4284
                    tmfree(pThreadInfo->lines);
167✔
4285
                    pThreadInfo->lines = NULL;
167✔
4286
                }
4287
                break;
167✔
4288

4289
            case STMT_IFACE:
110✔
4290
                // close stmt
4291
                if(pThreadInfo->conn->stmt) {
110!
4292
                    taos_stmt_close(pThreadInfo->conn->stmt);
110✔
4293
                    pThreadInfo->conn->stmt = NULL;
110✔
4294
                }
4295
            case STMT2_IFACE:
4296
                // close stmt2
4297
                if (pThreadInfo->conn->stmt2) {
197✔
4298
                    taos_stmt2_close(pThreadInfo->conn->stmt2);
87✔
4299
                    pThreadInfo->conn->stmt2 = NULL;
87✔
4300
                }
4301

4302
                tmfree(pThreadInfo->bind_ts);
197✔
4303
                tmfree(pThreadInfo->bind_ts_array);
197✔
4304
                tmfree(pThreadInfo->bindParams);
197✔
4305
                
4306
                // free tagsStmt
4307
                BArray *tags = pThreadInfo->tagsStmt;
197✔
4308
                if(tags) {
197!
4309
                    // free child
4310
                    for (int k = 0; k < tags->size; k++) {
1,096✔
4311
                        Field * tag = benchArrayGet(tags, k);
899✔
4312
                        tmfree(tag->stmtData.data);
899✔
4313
                        tag->stmtData.data = NULL;
899✔
4314
                        tmfree(tag->stmtData.is_null);
899✔
4315
                        tag->stmtData.is_null = NULL;
899✔
4316
                        tmfree(tag->stmtData.lengths);
899✔
4317
                        tag->stmtData.lengths = NULL;
899✔
4318
                    }
4319
                    // free parent
4320
                    benchArrayDestroy(tags);
197✔
4321
                    pThreadInfo->tagsStmt = NULL;
197✔
4322
                }
4323

4324
                // free lengths
4325
                if(pThreadInfo->lengths) {
197!
4326
                    for(int c = 0; c <= stbInfo->cols->size; c++) {
2,268✔
4327
                        tmfree(pThreadInfo->lengths[c]);
2,071✔
4328
                    }
4329
                    free(pThreadInfo->lengths);
197✔
4330
                    pThreadInfo->lengths = NULL;
197✔
4331
                }
4332
                break;
197✔
4333

4334
            case TAOSC_IFACE:
880✔
4335
                if (stbInfo->interlaceRows > 0) {
880✔
4336
                    free_ds(&pThreadInfo->buffer);
37✔
4337
                } else {
4338
                    tmfree(pThreadInfo->buffer);
843✔
4339
                    pThreadInfo->buffer = NULL;
843✔
4340
                }
4341
                break;
880✔
4342

4343
            default:
×
4344
                break;
×
4345
        }
4346
        totalInsertRows += pThreadInfo->totalInsertRows;
1,248✔
4347
        totalDelay += pThreadInfo->totalDelay;
1,248✔
4348
        totalDelay1 += pThreadInfo->totalDelay1;
1,248✔
4349
        totalDelay2 += pThreadInfo->totalDelay2;
1,248✔
4350
        totalDelay3 += pThreadInfo->totalDelay3;
1,248✔
4351
        benchArrayAddBatch(total_delay_list, pThreadInfo->delayList->pData,
1,248✔
4352
                pThreadInfo->delayList->size, true);
1,248✔
4353
        tmfree(pThreadInfo->delayList);
1,248✔
4354
        pThreadInfo->delayList = NULL;
1,248✔
4355
        //  free conn
4356
        if (pThreadInfo->conn) {
1,248✔
4357
            closeBenchConn(pThreadInfo->conn);
1,231✔
4358
            pThreadInfo->conn = NULL;
1,231✔
4359
        }
4360
    }
4361

4362
    // calculate result
4363
    qsort(total_delay_list->pData, total_delay_list->size,
256✔
4364
            total_delay_list->elemSize, compare);
4365

4366
    if (g_arguments->terminate)  toolsMsleep(100);
256!
4367

4368
    tmfree(pids);
256✔
4369
    tmfree(infos);
256✔
4370

4371
    // print result
4372
    int ret = printTotalDelay(database, totalDelay, totalDelay1, totalDelay2, totalDelay3,
256✔
4373
                              total_delay_list, nthreads, totalInsertRows, spend);
4374
    benchArrayDestroy(total_delay_list);
256✔
4375
    if (g_fail || ret != 0) {
256!
4376
        return -1;
1✔
4377
    }
4378
    return 0;
255✔
4379
}
4380

4381
static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo) {
257✔
4382
    if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE)
257✔
4383
            && !stbInfo->use_metric) {
51✔
4384
        errorPrint("%s", "schemaless cannot work without stable\n");
1!
4385
        return -1;
1✔
4386
    }
4387

4388
    // check argument valid
4389
    preProcessArgument(stbInfo);
256✔
4390

4391
    // ntable
4392
    int64_t ntables = obtainTableCount(database, stbInfo);
256✔
4393
    if (ntables == 0) {
256!
4394
        errorPrint("insert table count is zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4395
        return -1;
×
4396
    }
4397

4398
    // assign table to thread
4399
    int32_t  nthreads  = g_arguments->nthreads;
256✔
4400
    int64_t  div       = 0;  // ntable / nthread  division
256✔
4401
    int64_t  mod       = 0;  // ntable % nthread
256✔
4402
    int64_t  spend     = 0;
256✔
4403

4404
    if (g_arguments->bind_vgroup) {
256✔
4405
        nthreads = assignTableToThread(database, stbInfo);
2✔
4406
        if(nthreads == 0) {
2!
4407
            errorPrint("bind vgroup assign theads count is zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4408
            return -1;
×
4409
        }
4410
    } else {
4411
        if(nthreads == 0) {
254!
4412
            errorPrint("argument thread_count can not be zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4413
            return -1;
×
4414
        }
4415
        div = ntables / nthreads;
254✔
4416
        if (div < 1) {
254✔
4417
            nthreads = (int32_t)ntables;
67✔
4418
            div = 1;
67✔
4419
        }
4420
        mod = ntables % nthreads;
254✔
4421
    }
4422

4423

4424
    // init each thread information
4425
    pthread_t   *pids  = benchCalloc(1, nthreads * sizeof(pthread_t),  true);
256✔
4426
    threadInfo  *infos = benchCalloc(1, nthreads * sizeof(threadInfo), true);
256✔
4427

4428
    // init
4429
    int32_t ret = initInsertThread(database, stbInfo, nthreads, infos, div, mod);
256✔
4430
    if( ret != 0) {
256!
4431
        errorPrint("init insert thread failed. %s.%s\n", database->dbName, stbInfo->stbName);
×
4432
        tmfree(pids);
×
4433
        tmfree(infos);
×
4434
        return ret;
×
4435
    }
4436

4437
    infoPrint("Estimate memory usage: %.2fMB\n", (double)g_memoryUsage / 1048576);
256✔
4438
    prompt(0);
256✔
4439

4440
   
4441
    // run
4442
    int64_t start = toolsGetTimestampUs();
256✔
4443
    if(g_arguments->bind_vgroup && g_arguments->nthreads < nthreads ) {
256✔
4444
        // need many batch execute all threads
4445
#ifdef LINUX        
4446
        ret = runInsertLimitThread(database, stbInfo, nthreads, g_arguments->nthreads, infos, pids);
1✔
4447
#else
4448
        ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
4449
#endif        
4450
    } else {
4451
        // only one batch execute all threads
4452
        ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
255✔
4453
    }
4454

4455
    int64_t end = toolsGetTimestampUs();
256✔
4456
    if(end == start) {
256!
4457
        spend = 1;
×
4458
    } else {
4459
        spend = end - start;
256✔
4460
    }
4461

4462
    // exit
4463
    ret = exitInsertThread(database, stbInfo, nthreads, infos, pids, spend);
256✔
4464
    return ret;
256✔
4465
}
4466

4467
static int getStbInsertedRows(char* dbName, char* stbName, TAOS* taos) {
×
4468
    int rows = 0;
×
4469
    char command[SHORT_1K_SQL_BUFF_LEN];
4470
    snprintf(command, SHORT_1K_SQL_BUFF_LEN, "SELECT COUNT(*) FROM %s.%s",
×
4471
             dbName, stbName);
4472
    TAOS_RES* res = taos_query(taos, command);
×
4473
    int code = taos_errno(res);
×
4474
    if (code != 0) {
×
4475
        printErrCmdCodeStr(command, code, res);
×
4476
        return -1;
×
4477
    }
4478
    TAOS_ROW row = taos_fetch_row(res);
×
4479
    if (row == NULL) {
×
4480
        rows = 0;
×
4481
    } else {
4482
        rows = (int)*(int64_t*)row[0];
×
4483
    }
4484
    taos_free_result(res);
×
4485
    return rows;
×
4486
}
4487

4488
static void create_tsma(TSMA* tsma, SBenchConn* conn, char* stbName) {
×
4489
    char command[SHORT_1K_SQL_BUFF_LEN];
4490
    int len = snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
4491
                       "CREATE sma INDEX %s ON %s function(%s) "
4492
                       "INTERVAL (%s) SLIDING (%s)",
4493
                       tsma->name, stbName, tsma->func,
4494
                       tsma->interval, tsma->sliding);
4495
    if (tsma->custom) {
×
4496
        snprintf(command + len, SHORT_1K_SQL_BUFF_LEN - len,
×
4497
                 " %s", tsma->custom);
4498
    }
4499
    int code = queryDbExecCall(conn, command);
×
4500
    if (code == 0) {
×
4501
        infoPrint("successfully create tsma with command <%s>\n", command);
×
4502
    }
4503
}
×
4504

4505
static void* create_tsmas(void* args) {
×
4506
    tsmaThreadInfo* pThreadInfo = (tsmaThreadInfo*) args;
×
4507
    int inserted_rows = 0;
×
4508
    SBenchConn* conn = initBenchConn(pThreadInfo->dbName);
×
4509
    if (NULL == conn) {
×
4510
        return NULL;
×
4511
    }
4512
    int finished = 0;
×
4513
    if (taos_select_db(conn->taos, pThreadInfo->dbName)) {
×
4514
        errorPrint("failed to use database (%s)\n", pThreadInfo->dbName);
×
4515
        closeBenchConn(conn);
×
4516
        return NULL;
×
4517
    }
4518
    while (finished < pThreadInfo->tsmas->size && inserted_rows >= 0) {
×
4519
        inserted_rows = (int)getStbInsertedRows(
×
4520
                pThreadInfo->dbName, pThreadInfo->stbName, conn->taos);
4521
        for (int i = 0; i < pThreadInfo->tsmas->size; i++) {
×
4522
            TSMA* tsma = benchArrayGet(pThreadInfo->tsmas, i);
×
4523
            if (!tsma->done &&  inserted_rows >= tsma->start_when_inserted) {
×
4524
                create_tsma(tsma, conn, pThreadInfo->stbName);
×
4525
                tsma->done = true;
×
4526
                finished++;
×
4527
                break;
×
4528
            }
4529
        }
4530
        toolsMsleep(10);
×
4531
    }
4532
    benchArrayDestroy(pThreadInfo->tsmas);
×
4533
    closeBenchConn(conn);
×
4534
    return NULL;
×
4535
}
4536

4537
void changeGlobalIface() {
209✔
4538
    if (g_arguments->databases->size == 1) {
209✔
4539
            SDataBase *db = benchArrayGet(g_arguments->databases, 0);
208✔
4540
            if (db && db->superTbls->size == 1) {
208!
4541
                SSuperTable *stb = benchArrayGet(db->superTbls, 0);
178✔
4542
                if (stb) {
178!
4543
                    if(g_arguments->iface != stb->iface) {
178✔
4544
                        infoPrint("only 1 db 1 super table, g_arguments->iface(%d) replace with stb->iface(%d) \n", g_arguments->iface, stb->iface);
14✔
4545
                        g_arguments->iface = stb->iface;
14✔
4546
                    }
4547
                }
4548
            }
4549
    }
4550
}
209✔
4551

4552
int insertTestProcess() {
209✔
4553
    prompt(0);
209✔
4554

4555
    encodeAuthBase64();
209✔
4556
    // if only one stable, global iface same with stable->iface
4557
    changeGlobalIface();
209✔
4558

4559
    // move from loop to here
4560
    if (isRest(g_arguments->iface)) {
209✔
4561
        if (0 != convertServAddr(g_arguments->iface,
9!
4562
                                 false,
4563
                                 1)) {
4564
            return -1;
×
4565
        }
4566
    }    
4567

4568
    //loop create database 
4569
    for (int i = 0; i < g_arguments->databases->size; i++) {
410✔
4570
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
210✔
4571

4572
        if (database->drop && !(g_arguments->supplementInsert)) {
210✔
4573
            if (database->superTbls && database->superTbls->size > 0) {
198!
4574
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, 0);
197✔
4575
                if (stbInfo && isRest(stbInfo->iface)) {
197!
4576
                    if (0 != convertServAddr(stbInfo->iface,
10!
4577
                                             stbInfo->tcpTransfer,
10✔
4578
                                             stbInfo->lineProtocol)) {
10✔
4579
                        return -1;
×
4580
                    }
4581
                }
4582
            }
4583

4584
            if (createDatabase(database)) {
198✔
4585
                errorPrint("failed to create database (%s)\n",
9!
4586
                        database->dbName);
4587
                return -1;
9✔
4588
            }
4589
            succPrint("created database (%s)\n", database->dbName);
189!
4590
        } else if(g_arguments->bind_vgroup) {
12!
4591
            // database already exist, get vgroups from server
4592
            SBenchConn* conn = initBenchConn(NULL);
×
4593
            if (conn) {
×
4594
                int32_t vgroups = getVgroupsNative(conn, database);
×
4595
                if (vgroups <=0) {
×
4596
                    closeBenchConn(conn);
×
4597
                    errorPrint("Database %s's vgroups is zero , db exist case.\n", database->dbName);
×
4598
                    return -1;
×
4599
                }
4600
                closeBenchConn(conn);
×
4601
                succPrint("Database (%s) get vgroups num is %d from server.\n", database->dbName, vgroups);
×
4602
            }
4603
        }
4604
    }
4605

4606
    // create super table && fill child tables && prepareSampleData
4607
    for (int i = 0; i < g_arguments->databases->size; i++) {
401✔
4608
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
201✔
4609
        if (database->superTbls) {
201!
4610
            for (int j = 0; j < database->superTbls->size; j++) {
474✔
4611
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
273✔
4612
                if (stbInfo->iface != SML_IFACE
273✔
4613
                        && stbInfo->iface != SML_REST_IFACE
230✔
4614
                        && !stbInfo->childTblExists) {
222✔
4615
                    int code = getSuperTableFromServer(database, stbInfo);
211✔
4616
                    if (code == TSDB_CODE_FAILED) {
211!
4617
                        return -1;
×
4618
                    }
4619
                    
4620
                    // with create table if not exists, so if exist, can not report failed
4621
                    if (createSuperTable(database, stbInfo)) {
211!
4622
                        return -1;
×
4623
                    }
4624
                    
4625
                }
4626
                // fill last ts from super table
4627
                if(stbInfo->autoFillback && stbInfo->childTblExists) {
273!
4628
                    fillSTableLastTs(database, stbInfo);
×
4629
                }
4630

4631
                // calc now 
4632
                if(stbInfo->calcNow) {
273✔
4633
                    calcExprFromServer(database, stbInfo);
5✔
4634
                }
4635

4636
                // check fill child table count valid
4637
                if(fillChildTblName(database, stbInfo) <= 0) {
273✔
4638
                    infoPrint(" warning fill childs table count is zero, db:%s stb: %s \n", database->dbName, stbInfo->stbName);
12✔
4639
                }
4640
                if (0 != prepareSampleData(database, stbInfo)) {
273!
4641
                    return -1;
×
4642
                }
4643

4644
                // early malloc buffer for auto create table
4645
                if((stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating) {
273✔
4646
                    prepareTagsStmt(stbInfo);
3✔
4647
                }
4648

4649
                // execute sqls
4650
                if (stbInfo->sqls) {
273✔
4651
                    char **sqls = stbInfo->sqls;
16✔
4652
                    while (*sqls) {
80✔
4653
                        queryDbExec(database, stbInfo, *sqls);
64✔
4654
                        sqls++;
64✔
4655
                    } 
4656
                }
4657
            }
4658
        }
4659
    }
4660

4661
    // tsma
4662
    if (g_arguments->taosc_version == 3) {
200!
4663
        for (int i = 0; i < g_arguments->databases->size; i++) {
401✔
4664
            SDataBase* database = benchArrayGet(g_arguments->databases, i);
201✔
4665
            if (database->superTbls) {
201!
4666
                for (int j = 0; (j < database->superTbls->size
201✔
4667
                        && !g_arguments->terminate); j++) {
474!
4668
                    SSuperTable* stbInfo =
4669
                        benchArrayGet(database->superTbls, j);
273✔
4670
                    if (stbInfo->tsmas == NULL) {
273✔
4671
                        continue;
106✔
4672
                    }
4673
                    if (stbInfo->tsmas->size > 0) {
167!
4674
                        tsmaThreadInfo* pThreadInfo =
4675
                            benchCalloc(1, sizeof(tsmaThreadInfo), true);
×
4676
                        pthread_t tsmas_pid = {0};
×
4677
                        pThreadInfo->dbName = database->dbName;
×
4678
                        pThreadInfo->stbName = stbInfo->stbName;
×
4679
                        pThreadInfo->tsmas = stbInfo->tsmas;
×
4680
                        pthread_create(&tsmas_pid, NULL,
×
4681
                                       create_tsmas, pThreadInfo);
4682
                    }
4683
                }
4684
            }
4685
        }
4686
    }
4687

4688
    if (createChildTables()) return -1;
200!
4689

4690
    // create sub threads for inserting data
4691
    for (int i = 0; i < g_arguments->databases->size; i++) {
399✔
4692
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
201✔
4693
        if (database->superTbls) {
201!
4694
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
470✔
4695
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
271✔
4696
                if (stbInfo->insertRows == 0) {
271✔
4697
                    continue;
14✔
4698
                }
4699
                prompt(stbInfo->non_stop);
257✔
4700
                if (startMultiThreadInsertData(database, stbInfo)) {
257✔
4701
                    return -1;
2✔
4702
                }
4703
            }
4704
        }
4705
    }
4706
    return 0;
198✔
4707
}
4708

4709
//
4710
//     ------- STMT 2 -----------
4711
//
4712

4713
static int32_t stmt2BindAndSubmit(
380✔
4714
        threadInfo *pThreadInfo,
4715
        SChildTable *childTbl,
4716
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
4717
        int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w) {
4718
    
4719
    // create bindV
4720
    int32_t count            = 1;
380✔
4721
    TAOS_STMT2_BINDV * bindv = createBindV(count, 0, 0);
380✔
4722
    TAOS_STMT2 *stmt2        = pThreadInfo->conn->stmt2;
380✔
4723
    SSuperTable *stbInfo     = pThreadInfo->stbInfo;
380✔
4724

4725
    //
4726
    // bind
4727
    //
4728

4729
    // count
4730
    bindv->count = 1;
380✔
4731
    // tbnames
4732
    bindv->tbnames[0] = childTbl->name;
380✔
4733
    // tags
4734
    //bindv->tags[0] = NULL; // Progrssive mode tag put on prepare sql, no need put here
4735
   
4736
    // bind_cols
4737
    uint32_t batch = (g_arguments->reqPerReq > stbInfo->insertRows - i) ? (stbInfo->insertRows - i) : g_arguments->reqPerReq;
380✔
4738
    int32_t n = 0;
380✔
4739
    int64_t pos = i % g_arguments->prepared_rand;
380✔
4740

4741
    // adjust batch about pos
4742
    if(g_arguments->prepared_rand - pos < batch ) {
380!
4743
        debugPrint("prepared_rand(%" PRId64 ") is not a multiple of num_of_records_per(%d), the batch size can be modify. before=%d after=%d\n", 
×
4744
                    (int64_t)g_arguments->prepared_rand, (int32_t)g_arguments->reqPerReq, (int32_t)batch, (int32_t)(g_arguments->prepared_rand - pos));
4745
        batch = g_arguments->prepared_rand - pos;
×
4746
    } 
4747

4748
    if (batch == 0) {
380!
4749
        infoPrint("batch size is zero. pos = %"PRId64"\n", pos);
×
4750
        return 0;
×
4751
    }
4752

4753
    uint32_t generated = bindVColsProgressive(bindv, 0, pThreadInfo, batch, *timestamp, pos, childTbl, pkCur, pkCnt, &n);
380✔
4754
    if(generated == 0) {
381!
4755
        errorPrint( "get cols data bind information failed. table: %s\n", childTbl->name);
×
4756
        freeBindV(bindv);
×
4757
        return -1;
×
4758
    }
4759
    *timestamp += n * stbInfo->timestamp_step;
381✔
4760

4761
    if (g_arguments->debug_print) {
381✔
4762
        showBindV(bindv, stbInfo->tags, stbInfo->cols);
20✔
4763
    }
4764

4765
    // bind and submit
4766
    int32_t code = submitStmt2(pThreadInfo, bindv, delay1, delay3, startTs, endTs, &generated, w);
381✔
4767
    // free
4768
    freeBindV(bindv);
381✔
4769

4770
    if(code != 0) {
381!
4771
        errorPrint( "failed submitStmt2() progressive mode, table: %s . engine error: %s\n", childTbl->name, taos_stmt2_error(stmt2));
×
4772
        return code;
×
4773
    } else {
4774
        debugPrint("succ submitStmt2 progressive mode. table=%s batch=%d pos=%" PRId64 " ts=%" PRId64 " generated=%d\n",
381✔
4775
                childTbl->name, batch, pos, *timestamp, generated);
4776
        return generated;
381✔
4777
    }
4778
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc