• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4640

17 Jul 2025 10:48AM UTC coverage: 3.863%. Remained the same
#4640

push

travis-ci

web-flow
Merge pull request #31990 from taosdata/3.0

3.0

230 of 7082 branches covered (3.25%)

Branch coverage included in aggregate %.

0 of 62 new or added lines in 6 files covered. (0.0%)

1617 existing lines in 3 files now uncovered.

382 of 8759 relevant lines covered (4.36%)

1.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/tools/taos-tools/src/benchInsert.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "common.h"
15
#include "benchLog.h"
16
#include "wrapDb.h"
17
#include <benchData.h>
18
#include <benchInsertMix.h>
19

20
static int32_t stmt2BindAndSubmit(
21
        threadInfo *pThreadInfo,
22
        SChildTable *childTbl,
23
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
24
        int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w);
25
TAOS_STMT2* initStmt2(TAOS* taos, bool single);        
26

27
#define FREE_PIDS_INFOS_RETURN_MINUS_1()            \
28
    do {                                            \
29
        tmfree(pids);                               \
30
        tmfree(infos);                              \
31
        return -1;                                  \
32
    } while (0)
33

34
#define FREE_RESOURCE()                             \
35
    do {                                            \
36
        if (pThreadInfo->conn)                      \
37
            closeBenchConn(pThreadInfo->conn);      \
38
        benchArrayDestroy(pThreadInfo->delayList);  \
39
        tmfree(pids);                               \
40
        tmfree(infos);                              \
41
    } while (0)                                     \
42

43
// REST
44
static int getSuperTableFromServerRest(
×
45
    SDataBase* database, SSuperTable* stbInfo, char *command) {
46

47
    // TODO(zero): it will create super table based on this error code.
48
    return TSDB_CODE_NOT_FOUND;
×
49
    // TODO(me): finish full implementation
50
#if 0
51
    int sockfd = createSockFd();
52
    if (sockfd < 0) {
53
        return -1;
54
    }
55

56
    int code = postProcessSql(command,
57
                         database->dbName,
58
                         database->precision,
59
                         REST_IFACE,
60
                         0,
61
                         g_arguments->port,
62
                         false,
63
                         sockfd,
64
                         NULL);
65

66
    destroySockFd(sockfd);
67
#endif   // 0
68
}
69

70
static int getSuperTableFromServerTaosc(
×
71
        SDataBase *database, SSuperTable *stbInfo, char *command) {
72
    TAOS_RES *res;
73
    TAOS_ROW row = NULL;
×
74
    SBenchConn *conn = initBenchConn();
×
75
    if (NULL == conn) {
×
76
        return TSDB_CODE_FAILED;
×
77
    }
78

79
    res = taos_query(conn->taos, command);
×
80
    int32_t code = taos_errno(res);
×
81
    if (code != 0) {
×
82
        infoPrint("stable %s does not exist, will create one\n",
×
83
                  stbInfo->stbName);
84
        closeBenchConn(conn);
×
85
        return TSDB_CODE_NOT_FOUND;
×
86
    }
87
    infoPrint("find stable<%s>, will get meta data from server\n",
×
88
              stbInfo->stbName);
89

90
    // Check if the the existing super table matches exactly with the definitions in the JSON file.
91
    // If a hash table were used, the time complexity would be only O(n).
92
    // But taosBenchmark does not incorporate a hash table, the time complexity of the loop traversal is O(n^2).
93
    bool isTitleRow = true;
×
94
    uint32_t tag_count = 0;
×
95
    uint32_t col_count = 0;
×
96

97
    int fieldsNum = taos_num_fields(res);
×
98
    TAOS_FIELD_E* fields = taos_fetch_fields_e(res);
×
99

100
    if (fieldsNum < TSDB_MAX_DESCRIBE_METRIC || !fields) {
×
101
        errorPrint("%s", "failed to fetch fields\n");
×
102
        taos_free_result(res);
×
103
        closeBenchConn(conn);
×
104
        return TSDB_CODE_FAILED;
×
105
    }
106

107
    while ((row = taos_fetch_row(res)) != NULL) {
×
108
        if (isTitleRow) {
×
109
            isTitleRow = false;
×
110
            continue;
×
111
        }
112
        int32_t *lengths = taos_fetch_lengths(res);
×
113
        if (lengths == NULL) {
×
114
            errorPrint("%s", "failed to execute taos_fetch_length\n");
×
115
            taos_free_result(res);
×
116
            closeBenchConn(conn);
×
117
            return TSDB_CODE_FAILED;
×
118
        }
119
        if (strncasecmp((char *) row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "tag",
×
120
                        strlen("tag")) == 0) {
121
            if (stbInfo->tags == NULL || stbInfo->tags->size == 0 || tag_count >= stbInfo->tags->size) {
×
122
                errorPrint("%s", "existing stable tag mismatched with that defined in JSON\n");
×
123
                taos_free_result(res);
×
124
                closeBenchConn(conn);
×
125
                return TSDB_CODE_FAILED;
×
126
            }
127
            uint8_t tagType = convertStringToDatatype(
×
128
                    (char *) row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
×
129
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX], &(fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].precision));
×
130
            char *tagName = (char *) row[TSDB_DESCRIBE_METRIC_FIELD_INDEX];
×
131
            if (!searchBArray(stbInfo->tags, tagName,
×
132
                              lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX], tagType)) {
133
                errorPrint("existing stable tag:%s not found in JSON tags.\n", tagName);
×
134
                taos_free_result(res);
×
135
                closeBenchConn(conn);
×
136
                return TSDB_CODE_FAILED;
×
137
            }
138
            tag_count += 1;
×
139
        } else {
140
            if (stbInfo->cols == NULL || stbInfo->cols->size == 0 || col_count >= stbInfo->cols->size) {
×
141
                errorPrint("%s", "existing stable column mismatched with that defined in JSON\n");
×
142
                taos_free_result(res);
×
143
                closeBenchConn(conn);
×
144
                return TSDB_CODE_FAILED;
×
145
            }
146
            uint8_t colType = convertStringToDatatype(
×
147
                    (char *) row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
×
148
                    lengths[TSDB_DESCRIBE_METRIC_TYPE_INDEX], &(fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].precision));
×
149
            char * colName = (char *) row[TSDB_DESCRIBE_METRIC_FIELD_INDEX];
×
150
            if (!searchBArray(stbInfo->cols, colName,
×
151
                              lengths[TSDB_DESCRIBE_METRIC_FIELD_INDEX], colType)) {
152
                errorPrint("existing stable col:%s not found in JSON cols.\n", colName);
×
153
                taos_free_result(res);
×
154
                closeBenchConn(conn);
×
155
                return TSDB_CODE_FAILED;
×
156
            }
157
            col_count += 1;
×
158
        }
159
    }  // end while
160
    taos_free_result(res);
×
161
    closeBenchConn(conn);
×
162
    if (tag_count != stbInfo->tags->size) {
×
163
        errorPrint("%s", "existing stable tag mismatched with that defined in JSON\n");
×
164
        return TSDB_CODE_FAILED;
×
165
    }
166

167
    if (col_count != stbInfo->cols->size) {
×
168
        errorPrint("%s", "existing stable column mismatched with that defined in JSON\n");
×
169
        return TSDB_CODE_FAILED;
×
170
    }
171

172
    return TSDB_CODE_SUCCESS;
×
173
}
174

175

176
static int getSuperTableFromServer(SDataBase* database, SSuperTable* stbInfo) {
×
177
    int ret = 0;
×
178
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
179
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
180
             "DESCRIBE `%s`.`%s`", database->dbName,
181
             stbInfo->stbName);
182

183
    if (REST_IFACE == stbInfo->iface) {
×
184
        ret = getSuperTableFromServerRest(database, stbInfo, command);
×
185
    } else {
186
        ret = getSuperTableFromServerTaosc(database, stbInfo, command);
×
187
    }
188

189
    return ret;
×
190
}
191

192
static int queryDbExec(SDataBase *database,
×
193
                       SSuperTable *stbInfo, char *command) {
194
    int ret = 0;
×
195
    if (isRest(stbInfo->iface)) {
×
196
        if (0 != convertServAddr(stbInfo->iface, false, 1)) {
×
197
            errorPrint("%s", "Failed to convert server address\n");
×
198
            return -1;
×
199
        }
200
        int sockfd = createSockFd();
×
201
        if (sockfd < 0) {
×
202
            ret = -1;
×
203
        } else {
204
            ret = queryDbExecRest(command,
×
205
                              database->dbName,
206
                              database->precision,
207
                              stbInfo->iface,
×
208
                              stbInfo->lineProtocol,
×
209
                              stbInfo->tcpTransfer,
×
210
                              sockfd);
211
            destroySockFd(sockfd);
×
212
        }
213
    } else {
214
        SBenchConn* conn = initBenchConn();
×
215
        if (NULL == conn) {
×
216
            ret = -1;
×
217
        } else {
218
            ret = queryDbExecCall(conn, command);
×
219
            int32_t trying = g_arguments->keep_trying;
×
220
            while (ret && trying) {
×
221
                infoPrint("will sleep %"PRIu32" milliseconds then re-execute command: %s\n",
×
222
                          g_arguments->trying_interval, command);
223
                toolsMsleep(g_arguments->trying_interval);
×
224
                ret = queryDbExecCall(conn, command);
×
225
                if (trying != -1) {
×
226
                    trying--;
×
227
                }
228
            }
229
            if (0 != ret) {
×
230
                ret = -1;
×
231
            }
232
            closeBenchConn(conn);
×
233
        }
234
    }
235

236
    return ret;
×
237
}
238

239
int getCompressStr(Field* col, char* buf) {
×
240
    int pos = 0;
×
241
    if(strlen(col->encode) > 0) {
×
242
        pos +=sprintf(buf + pos, "encode \'%s\' ", col->encode);
×
243
    }
244
    if(strlen(col->compress) > 0) {
×
245
        pos +=sprintf(buf + pos, "compress \'%s\' ", col->compress);
×
246
    }
247
    if(strlen(col->level) > 0) {
×
248
        pos +=sprintf(buf + pos, "level \'%s\' ", col->level);
×
249
    }
250

251
    return pos;
×
252
}
253

254
static int createSuperTable(SDataBase* database, SSuperTable* stbInfo) {
×
255
    if (g_arguments->supplementInsert) {
×
256
        return 0;
×
257
    }
258

259
    uint32_t col_buffer_len = (TSDB_COL_NAME_LEN + 15 + COMP_NAME_LEN*3) * stbInfo->cols->size;
×
260
    char         *colsBuf = benchCalloc(1, col_buffer_len, false);
×
261
    char*         command = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
×
262
    int          len = 0;
×
263

264
    for (int colIndex = 0; colIndex < stbInfo->cols->size; colIndex++) {
×
265
        Field * col = benchArrayGet(stbInfo->cols, colIndex);
×
266
        int n;
267
        if (col->type == TSDB_DATA_TYPE_BINARY ||
×
268
            col->type == TSDB_DATA_TYPE_NCHAR ||
×
269
            col->type == TSDB_DATA_TYPE_VARBINARY ||
×
270
            col->type == TSDB_DATA_TYPE_GEOMETRY) {
×
271
            n = snprintf(colsBuf + len, col_buffer_len - len,
×
272
                    ",%s %s(%d)", col->name,
×
273
                    convertDatatypeToString(col->type), col->length);
×
274
            if (col->type == TSDB_DATA_TYPE_GEOMETRY && col->length < 21) {
×
275
                errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
×
276
                           colIndex);
277
                return -1;
×
278
            }
279
        } else if (col->type == TSDB_DATA_TYPE_DECIMAL
×
280
                || col->type == TSDB_DATA_TYPE_DECIMAL64) {
×
281
            n = snprintf(colsBuf + len, col_buffer_len - len,
×
282
                    ",%s %s(%d,%d)", col->name,
×
283
                    convertDatatypeToString(col->type), col->precision, col->scale);
×
284
        } else {
285
            n = snprintf(colsBuf + len, col_buffer_len - len,
×
286
                    ",%s %s", col->name,
×
287
                    convertDatatypeToString(col->type));
×
288
        }
289

290
        // primary key
291
        if(stbInfo->primary_key && colIndex == 0) {
×
292
            len += n;
×
293
            n = snprintf(colsBuf + len, col_buffer_len - len, " %s", PRIMARY_KEY);
×
294
        }
295

296
        // compress key
297
        char keys[COMP_NAME_LEN*3] = "";
×
298
        if (getCompressStr(col, keys) > 0) {
×
299
            len += n;
×
300
            n = snprintf(colsBuf + len, col_buffer_len - len, " %s", keys);
×
301
        }
302

303
        if (n < 0 || n >= col_buffer_len - len) {
×
304
            errorPrint("%s() LN%d, snprintf overflow on %d\n",
×
305
                       __func__, __LINE__, colIndex);
306
            break;
×
307
        } else {
308
            len += n;
×
309
        }
310
    }
311

312
    // save for creating child table
313
    stbInfo->colsOfCreateChildTable =
×
314
        (char *)benchCalloc(len + TIMESTAMP_BUFF_LEN, 1, true);
×
315

316
    snprintf(stbInfo->colsOfCreateChildTable, len + TIMESTAMP_BUFF_LEN,
×
NEW
317
             "(%s timestamp%s)", stbInfo->primaryKeyName, colsBuf);
×
318

319
    if (stbInfo->tags->size == 0) {
×
320
        free(colsBuf);
×
321
        free(command);
×
322
        return 0;
×
323
    }
324

325
    uint32_t tag_buffer_len = (TSDB_COL_NAME_LEN + 15) * stbInfo->tags->size;
×
326
    char *tagsBuf = benchCalloc(1, tag_buffer_len, false);
×
327
    int  tagIndex;
328
    len = 0;
×
329

330
    int n;
331
    n = snprintf(tagsBuf + len, tag_buffer_len - len, "(");
×
332
    if (n < 0 || n >= tag_buffer_len - len) {
×
333
        errorPrint("%s() LN%d snprintf overflow\n",
×
334
                       __func__, __LINE__);
335
        free(colsBuf);
×
336
        free(command);
×
337
        tmfree(tagsBuf);
×
338
        return -1;
×
339
    } else {
340
        len += n;
×
341
    }
342
    for (tagIndex = 0; tagIndex < stbInfo->tags->size; tagIndex++) {
×
343
        Field *tag = benchArrayGet(stbInfo->tags, tagIndex);
×
344
        if (tag->type == TSDB_DATA_TYPE_BINARY ||
×
345
            tag->type == TSDB_DATA_TYPE_NCHAR ||
×
346
            tag->type == TSDB_DATA_TYPE_VARBINARY ||
×
347
            tag->type == TSDB_DATA_TYPE_GEOMETRY) {
×
348
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
×
349
                    "%s %s(%d),", tag->name,
×
350
                    convertDatatypeToString(tag->type), tag->length);
×
351
            if (tag->type == TSDB_DATA_TYPE_GEOMETRY && tag->length < 21) {
×
352
                errorPrint("%s() LN%d, geometry filed len must be greater than 21 on %d\n", __func__, __LINE__,
×
353
                           tagIndex);
354
                return -1;
×
355
            }
356
        } else if (tag->type == TSDB_DATA_TYPE_JSON) {
×
357
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
×
358
                    "%s json", tag->name);
×
359
            if (n < 0 || n >= tag_buffer_len - len) {
×
360
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
361
                       __func__, __LINE__, tagIndex);
362
                break;
×
363
            } else {
364
                len += n;
×
365
            }
366
            goto skip;
×
367
        }
368
        // else if (tag->type == TSDB_DATA_TYPE_DECIMAL
369
        //         || tag->type == TSDB_DATA_TYPE_DECIMAL64) {
370
        //     n = snprintf(tagsBuf + len, tag_buffer_len - len,
371
        //             "%s %s(%d,%d),", tag->name,
372
        //             convertDatatypeToString(tag->type), tag->precision, tag->scale);
373
        // }
374
        else {
375
            n = snprintf(tagsBuf + len, tag_buffer_len - len,
×
376
                    "%s %s,", tag->name,
×
377
                    convertDatatypeToString(tag->type));
×
378
        }
379

380
        if (n < 0 || n >= tag_buffer_len - len) {
×
381
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
382
                       __func__, __LINE__, tagIndex);
383
            break;
×
384
        } else {
385
            len += n;
×
386
        }
387
    }
388
    len -= 1;
×
389
skip:
×
390
    snprintf(tagsBuf + len, tag_buffer_len - len, ")");
×
391

392
    int length = snprintf(
×
393
        command, TSDB_MAX_ALLOWED_SQL_LEN,
394
        g_arguments->escape_character
×
395
            ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` (%s TIMESTAMP%s) TAGS %s"
396
            : "CREATE TABLE IF NOT EXISTS %s.%s (%s TIMESTAMP%s) TAGS %s",
NEW
397
        database->dbName, stbInfo->stbName, stbInfo->primaryKeyName, colsBuf, tagsBuf);
×
398
    tmfree(colsBuf);
×
399
    tmfree(tagsBuf);
×
400
    if (stbInfo->comment != NULL) {
×
401
        length += snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length,
×
402
                           " COMMENT '%s'", stbInfo->comment);
403
    }
404
    if (stbInfo->delay >= 0) {
×
405
        length += snprintf(command + length,
×
406
                           TSDB_MAX_ALLOWED_SQL_LEN - length, " DELAY %d",
×
407
                           stbInfo->delay);
408
    }
409
    if (stbInfo->file_factor >= 0) {
×
410
        length +=
×
411
            snprintf(command + length,
×
412
                     TSDB_MAX_ALLOWED_SQL_LEN - length, " FILE_FACTOR %f",
×
413
                     (float)stbInfo->file_factor / 100);
×
414
    }
415
    if (stbInfo->rollup != NULL) {
×
416
        length += snprintf(command + length,
×
417
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
418
                           " ROLLUP(%s)", stbInfo->rollup);
419
    }
420

421
    if (stbInfo->max_delay != NULL) {
×
422
        length += snprintf(command + length,
×
423
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
424
                " MAX_DELAY %s", stbInfo->max_delay);
425
    }
426

427
    if (stbInfo->watermark != NULL) {
×
428
        length += snprintf(command + length,
×
429
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
×
430
                " WATERMARK %s", stbInfo->watermark);
431
    }
432

433
    // not support ttl in super table
434
    /*
435
    if (stbInfo->ttl != 0) {
436
        length += snprintf(command + length,
437
                           TSDB_MAX_ALLOWED_SQL_LEN - length,
438
                " TTL %d", stbInfo->ttl);
439
    }
440
    */
441

442
    bool first_sma = true;
×
443
    for (int i = 0; i < stbInfo->cols->size; i++) {
×
444
        Field * col = benchArrayGet(stbInfo->cols, i);
×
445
        if (col->sma) {
×
446
            if (first_sma) {
×
447
                n = snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, " SMA(%s", col->name);
×
448
                first_sma = false;
×
449
            } else {
450
                n = snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, ",%s", col->name);
×
451
            }
452

453
            if (n < 0 || n > TSDB_MAX_ALLOWED_SQL_LEN - length) {
×
454
                errorPrint("%s() LN%d snprintf overflow on %d iteral\n", __func__, __LINE__, i);
×
455
                break;
×
456
            } else {
457
                length += n;
×
458
            }
459
        }
460
    }
461
    if (!first_sma) {
×
462
        snprintf(command + length, TSDB_MAX_ALLOWED_SQL_LEN - length, ")");
×
463
    }
NEW
464
    debugPrint("create stable: <%s>\n", command);
×
465

466
    int ret = queryDbExec(database, stbInfo, command);
×
467
    free(command);
×
468
    return ret;
×
469
}
470

471

472
int32_t getVgroupsNative(SBenchConn *conn, SDataBase *database) {
×
473
    int     vgroups = 0;
×
474
    char    cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
475
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
476
            g_arguments->escape_character
×
477
            ? "SHOW `%s`.VGROUPS"
478
            : "SHOW %s.VGROUPS",
479
            database->dbName);
480

481
    TAOS_RES* res = taos_query(conn->taos, cmd);
×
482
    int code = taos_errno(res);
×
483
    if (code) {
×
484
        printErrCmdCodeStr(cmd, code, res);
×
485
        return -1;
×
486
    }
487

488
    TAOS_ROW row = NULL;
×
489
    while ((row = taos_fetch_row(res)) != NULL) {
×
490
        vgroups++;
×
491
    }
492
    debugPrint("%s() LN%d, vgroups: %d\n", __func__, __LINE__, vgroups);
×
493
    taos_free_result(res);
×
494

495
    database->vgroups = vgroups;
×
496
    database->vgArray = benchArrayInit(vgroups, sizeof(SVGroup));
×
497
    for (int32_t v = 0; (v < vgroups
×
498
            && !g_arguments->terminate); v++) {
×
499
        SVGroup *vg = benchCalloc(1, sizeof(SVGroup), true);
×
500
        benchArrayPush(database->vgArray, vg);
×
501
    }
502

503
    res = taos_query(conn->taos, cmd);
×
504
    code = taos_errno(res);
×
505
    if (code) {
×
506
        printErrCmdCodeStr(cmd, code, res);
×
507
        return -1;
×
508
    }
509

510
    int32_t vgItem = 0;
×
511
    while (((row = taos_fetch_row(res)) != NULL)
×
512
            && !g_arguments->terminate) {
×
513
        SVGroup *vg = benchArrayGet(database->vgArray, vgItem);
×
514
        vg->vgId = *(int32_t*)row[0];
×
515
        vgItem++;
×
516
    }
517
    taos_free_result(res);
×
518

519
    return vgroups;
×
520
}
521

522
int geneDbCreateCmd(SDataBase *database, char *command, int remainVnodes) {
×
523
    int dataLen = 0;
×
524
    int n;
525

526
    // create database
527
    n = snprintf(command + dataLen, SHORT_1K_SQL_BUFF_LEN - dataLen,
×
528
                g_arguments->escape_character
×
529
                    ? "CREATE DATABASE IF NOT EXISTS `%s`"
530
                    : "CREATE DATABASE IF NOT EXISTS %s",
531
                    database->dbName);
532

533
    if (n < 0 || n >= SHORT_1K_SQL_BUFF_LEN - dataLen) {
×
534
        errorPrint("%s() LN%d snprintf overflow\n",
×
535
                           __func__, __LINE__);
536
        return -1;
×
537
    } else {
538
        dataLen += n;
×
539
    }
540

541
    int vgroups = g_arguments->inputted_vgroups;
×
542

543
    // append config items
544
    if (database->cfgs) {
×
545
        for (int i = 0; i < database->cfgs->size; i++) {
×
546
            SDbCfg* cfg = benchArrayGet(database->cfgs, i);
×
547

548
            // check vgroups
549
            if (trimCaseCmp(cfg->name, "vgroups") == 0) {
×
550
                if (vgroups > 0) {
×
551
                    // inputted vgroups by commandline
552
                    infoPrint("ignore config set vgroups %d\n", cfg->valueint);
×
553
                } else {
554
                    vgroups = cfg->valueint;
×
555
                }
556
                continue;
×
557
            }
558

559
            if (cfg->valuestring) {
×
560
                n = snprintf(command + dataLen,
×
561
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
×
562
                            " %s %s", cfg->name, cfg->valuestring);
563
            } else {
564
                n = snprintf(command + dataLen,
×
565
                                        TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
×
566
                            " %s %d", cfg->name, cfg->valueint);
567
            }
568
            if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - dataLen) {
×
569
                errorPrint("%s() LN%d snprintf overflow on %d\n",
×
570
                           __func__, __LINE__, i);
571
                break;
×
572
            } else {
573
                dataLen += n;
×
574
            }
575
        }
576
    }
577

578
    // not found vgroups
579
    if (vgroups > 0) {
×
580
        dataLen += snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen, " VGROUPS %d", vgroups);
×
581
    }
582

583
    switch (database->precision) {
×
584
        case TSDB_TIME_PRECISION_MILLI:
×
585
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
×
586
                                " PRECISION \'ms\';");
587
            break;
×
588
        case TSDB_TIME_PRECISION_MICRO:
×
589
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
×
590
                                " PRECISION \'us\';");
591
            break;
×
592
        case TSDB_TIME_PRECISION_NANO:
×
593
            snprintf(command + dataLen, TSDB_MAX_ALLOWED_SQL_LEN - dataLen,
×
594
                                " PRECISION \'ns\';");
595
            break;
×
596
    }
597

598
    return dataLen;
×
599
}
600

601
int createDatabaseRest(SDataBase* database) {
×
602
    int32_t code = 0;
×
603
    char       command[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
604

605
    int sockfd = createSockFd();
×
606
    if (sockfd < 0) {
×
607
        return -1;
×
608
    }
609

610
    // drop exist database
611
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
612
            g_arguments->escape_character
×
613
                ? "DROP DATABASE IF EXISTS `%s`;"
614
                : "DROP DATABASE IF EXISTS %s;",
615
             database->dbName);
616
    code = postProcessSql(command,
×
617
                        database->dbName,
618
                        database->precision,
619
                        REST_IFACE,
620
                        0,
621
                        g_arguments->port,
×
622
                        false,
623
                        sockfd,
624
                        NULL);
625
    if (code != 0) {
×
626
        errorPrint("Failed to drop database %s\n", database->dbName);
×
627
    }
628

629
    // create database
630
    int remainVnodes = INT_MAX;
×
631
    geneDbCreateCmd(database, command, remainVnodes);
×
632
    code = postProcessSql(command,
×
633
                        database->dbName,
634
                        database->precision,
635
                        REST_IFACE,
636
                        0,
637
                        g_arguments->port,
×
638
                        false,
639
                        sockfd,
640
                        NULL);
641
    int32_t trying = g_arguments->keep_trying;
×
642
    while (code && trying) {
×
643
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
644
                "re-create database %s\n",
645
                g_arguments->trying_interval, database->dbName);
646
        toolsMsleep(g_arguments->trying_interval);
×
647
        code = postProcessSql(command,
×
648
                        database->dbName,
649
                        database->precision,
650
                        REST_IFACE,
651
                        0,
652
                        g_arguments->port,
×
653
                        false,
654
                        sockfd,
655
                        NULL);
656
        if (trying != -1) {
×
657
            trying--;
×
658
        }
659
    }
660

661
    destroySockFd(sockfd);
×
662
    return code;
×
663
}
664

665
int32_t getRemainVnodes(SBenchConn *conn) {
×
666
    int remainVnodes = 0;
×
667
    char command[SHORT_1K_SQL_BUFF_LEN] = "SHOW DNODES";
×
668

669
    TAOS_RES *res = taos_query(conn->taos, command);
×
670
    int32_t   code = taos_errno(res);
×
671
    if (code) {
×
672
        printErrCmdCodeStr(command, code, res);
×
673
        closeBenchConn(conn);
×
674
        return -1;
×
675
    }
676
    TAOS_ROW row = NULL;
×
677
    while ((row = taos_fetch_row(res)) != NULL) {
×
678
        remainVnodes += (*(int16_t*)(row[3]) - *(int16_t*)(row[2]));
×
679
    }
680
    debugPrint("%s() LN%d, remainVnodes: %d\n",
×
681
               __func__, __LINE__, remainVnodes);
682
    taos_free_result(res);
×
683
    return remainVnodes;
×
684
}
685

686
int createDatabaseTaosc(SDataBase* database) {
×
687
    char command[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
688
    // conn
689
    SBenchConn* conn = initBenchConn();
×
690
    if (NULL == conn) {
×
691
        return -1;
×
692
    }
693

694
    // drop stream in old database
695
    for (int i = 0; i < g_arguments->streams->size; i++) {
×
696
        SSTREAM* stream = benchArrayGet(g_arguments->streams, i);
×
697
        if (stream->drop) {
×
698
            snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
699
                        "DROP STREAM IF EXISTS %s;",
700
                    stream->stream_name);
×
701
            if (queryDbExecCall(conn, command)) {
×
702
                closeBenchConn(conn);
×
703
                return -1;
×
704
            }
705
            infoPrint("%s\n", command);
×
706
            memset(command, 0, SHORT_1K_SQL_BUFF_LEN);
×
707
        }
708
    }
709

710
    // drop old database
711
    snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
712
            g_arguments->escape_character
×
713
                ? "DROP DATABASE IF EXISTS `%s`;":
714
            "DROP DATABASE IF EXISTS %s;",
715
             database->dbName);
716
    if (0 != queryDbExecCall(conn, command)) {
×
717
        if (g_arguments->dsn) {
×
718
            // websocket
719
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
720
                      "to drop database! DROP DATABASE failure is ignored!\n");
721
        }
722
        closeBenchConn(conn);
×
723
        return -1;
×
724
    }
725

726
    // get remain vgroups
727
    int remainVnodes = INT_MAX;
×
728
    if (g_arguments->bind_vgroup) {
×
729
        remainVnodes = getRemainVnodes(conn);
×
730
        if (0 >= remainVnodes) {
×
731
            errorPrint("Remain vnodes %d, failed to create database\n",
×
732
                       remainVnodes);
733
            return -1;
×
734
        }
735
    }
736

737
    // generate and execute create database sql
738
    geneDbCreateCmd(database, command, remainVnodes);
×
739
    int32_t code = queryDbExecCall(conn, command);
×
740
    int32_t trying = g_arguments->keep_trying;
×
741
    while (code && trying) {
×
742
        infoPrint("will sleep %"PRIu32" milliseconds then "
×
743
                  "re-create database %s\n",
744
                  g_arguments->trying_interval, database->dbName);
745
        toolsMsleep(g_arguments->trying_interval);
×
746
        code = queryDbExecCall(conn, command);
×
747
        if (trying != -1) {
×
748
            trying--;
×
749
        }
750
    }
751

752
    if (code) {
×
753
        if (g_arguments->dsn) {
×
754
            warnPrint("%s", "TDengine cloud normal users have no privilege "
×
755
                      "to create database! CREATE DATABASE "
756
                      "failure is ignored!\n");
757
        } 
758

759
        closeBenchConn(conn);
×
760
        errorPrint("\ncreate database %s failed!\n\n", database->dbName);
×
761
        return -1;
×
762
    }
763
    infoPrint("command to create database: <%s>\n", command);
×
764

765

766
    // malloc and get vgroup
767
    if (g_arguments->bind_vgroup) {
×
768
        int32_t vgroups;
769
        vgroups = getVgroupsNative(conn, database);
×
770
        if (vgroups <= 0) {
×
771
            closeBenchConn(conn);
×
772
            errorPrint("Database %s's vgroups is %d\n",
×
773
                        database->dbName, vgroups);
774
            return -1;
×
775
        }
776
    }
777

778
    closeBenchConn(conn);
×
779
    return 0;
×
780
}
781

782
int createDatabase(SDataBase* database) {
×
783
    int ret = 0;
×
784
    if (isRest(g_arguments->iface)) {
×
785
        ret = createDatabaseRest(database);
×
786
    } else {
787
        ret = createDatabaseTaosc(database);
×
788
    }
789
    return ret;
×
790
}
791

792
static int generateChildTblName(int len, char *buffer, SDataBase *database,
×
793
                                SSuperTable *stbInfo, uint64_t tableSeq, char* tagData, int i,
794
                                char *ttl, char *tableName) {
795
    if (0 == len) {
×
796
        memset(buffer, 0, TSDB_MAX_ALLOWED_SQL_LEN);
×
797
        len += snprintf(buffer + len,
×
798
                        TSDB_MAX_ALLOWED_SQL_LEN - len, "CREATE TABLE");
×
799
    }
800

NEW
801
    const char *tagStart = tagData + i * stbInfo->lenOfTags;  // start position of current row's TAG
×
NEW
802
    const char *tagsForSQL = tagStart;  // actual TAG part for SQL
×
NEW
803
    const char *fmt = g_arguments->escape_character ?
×
NEW
804
        " IF NOT EXISTS `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s " :
×
805
        " IF NOT EXISTS %s.%s USING %s.%s TAGS (%s) %s ";
806

NEW
807
    if (stbInfo->useTagTableName) {
×
NEW
808
        char *firstComma = strchr(tagStart, ',');
×
NEW
809
        if (firstComma == NULL && tagStart >= firstComma) {
×
NEW
810
            errorPrint("Invalid tag data format: %s\n", tagStart);
×
NEW
811
            return -1;
×
812
        }
NEW
813
        size_t nameLen = firstComma - tagStart;
×
NEW
814
        strncpy(tableName, tagStart, nameLen);
×
NEW
815
        tableName[nameLen] = '\0';
×
NEW
816
        tagsForSQL = firstComma + 1;      
×
817
    } else {
818
        // generate table name using prefix + sequence number 
NEW
819
        snprintf(tableName, TSDB_MAX_ALLOWED_SQL_LEN, "%s%" PRIu64, 
×
820
                 stbInfo->childTblPrefix, tableSeq);
NEW
821
        tagsForSQL = tagStart;
×
822

823
    }        
824

NEW
825
    len += snprintf(buffer + len, TSDB_MAX_ALLOWED_SQL_LEN - len, fmt,
×
826
                    database->dbName, tableName,
827
                    database->dbName, stbInfo->stbName,
828
                    tagsForSQL, ttl);
NEW
829
    debugPrint("create table: <%s> <%s>\n", buffer, tableName);
×
830
    return len;
×
831
}
832

833
static int getBatchOfTblCreating(threadInfo *pThreadInfo,
×
834
                                         SSuperTable *stbInfo) {
835
    BArray *batchArray = stbInfo->batchTblCreatingNumbersArray;
×
836
    if (batchArray) {
×
UNCOV
837
        int *batch = benchArrayGet(
×
UNCOV
838
                batchArray, pThreadInfo->posOfTblCreatingBatch);
×
839
        pThreadInfo->posOfTblCreatingBatch++;
×
UNCOV
840
        if (pThreadInfo->posOfTblCreatingBatch == batchArray->size) {
×
841
            pThreadInfo->posOfTblCreatingBatch = 0;
×
842
        }
UNCOV
843
        return *batch;
×
844
    }
845
    return 0;
×
846
}
847

UNCOV
848
static int getIntervalOfTblCreating(threadInfo *pThreadInfo,
×
849
                                         SSuperTable *stbInfo) {
850
    BArray *intervalArray = stbInfo->batchTblCreatingIntervalsArray;
×
UNCOV
851
    if (intervalArray) {
×
UNCOV
852
        int *interval = benchArrayGet(
×
853
                intervalArray, pThreadInfo->posOfTblCreatingInterval);
×
UNCOV
854
        pThreadInfo->posOfTblCreatingInterval++;
×
855
        if (pThreadInfo->posOfTblCreatingInterval == intervalArray->size) {
×
856
            pThreadInfo->posOfTblCreatingInterval = 0;
×
857
        }
858
        return *interval;
×
859
    }
860
    return 0;
×
861
}
862

863
// table create thread
UNCOV
864
static void *createTable(void *sarg) {
×
865
    if (g_arguments->supplementInsert) {
×
UNCOV
866
        return NULL;
×
867
    }
868

UNCOV
869
    threadInfo  *pThreadInfo    = (threadInfo *)sarg;
×
870
    SDataBase   *database       = pThreadInfo->dbInfo;
×
871
    SSuperTable *stbInfo        = pThreadInfo->stbInfo;
×
872
    uint64_t    lastTotalCreate = 0;
×
873
    uint64_t    lastPrintTime   = toolsGetTimestampMs();
×
874
    int32_t     len             = 0;
×
875
    int32_t     batchNum        = 0;
×
876
    char ttl[SMALL_BUFF_LEN]    = "";
×
877

878
#ifdef LINUX
UNCOV
879
    prctl(PR_SET_NAME, "createTable");
×
880
#endif
UNCOV
881
    pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
×
UNCOV
882
    infoPrint(
×
883
              "thread[%d] start creating table from %" PRIu64 " to %" PRIu64
884
              "\n",
885
              pThreadInfo->threadID, pThreadInfo->start_table_from,
886
              pThreadInfo->end_table_to);
UNCOV
887
    if (stbInfo->ttl != 0) {
×
UNCOV
888
        snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
×
889
    }
890

891
    // tag read from csv
892
    FILE *csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
×
893
    // malloc
894
    char* tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
×
895
    int         w = 0; // record tagData
×
896

UNCOV
897
    int smallBatchCount = 0;
×
UNCOV
898
    int index=  pThreadInfo->start_table_from;
×
NEW
899
    int tableSum = pThreadInfo->end_table_to - pThreadInfo->start_table_from + 1;
×
NEW
900
    if (stbInfo->useTagTableName) {
×
NEW
901
        pThreadInfo->childNames = benchCalloc(tableSum, sizeof(char *), false);
×
NEW
902
        pThreadInfo->childTblCount = tableSum;
×
903
    }
904

NEW
905
    for (uint64_t i = pThreadInfo->start_table_from, j = 0;
×
906
                  i <= pThreadInfo->end_table_to && !g_arguments->terminate;
×
NEW
907
                  i++, j++) {
×
908
        if (g_arguments->terminate) {
×
UNCOV
909
            goto create_table_end;
×
910
        }
UNCOV
911
        if (!stbInfo->use_metric || stbInfo->tags->size == 0) {
×
UNCOV
912
            if (stbInfo->childTblCount == 1) {
×
913
                snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
×
914
                         g_arguments->escape_character
×
915
                         ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` %s;"
916
                         : "CREATE TABLE IF NOT EXISTS %s.%s %s;",
917
                         database->dbName, stbInfo->stbName,
918
                         stbInfo->colsOfCreateChildTable);
919
            } else {
920
                snprintf(pThreadInfo->buffer, TSDB_MAX_ALLOWED_SQL_LEN,
×
921
                         g_arguments->escape_character
×
922
                         ? "CREATE TABLE IF NOT EXISTS `%s`.`%s` %s;"
923
                         : "CREATE TABLE IF NOT EXISTS %s.%s %s;",
924
                         database->dbName,
925
                         stbInfo->childTblArray[i]->name,
×
926
                         stbInfo->colsOfCreateChildTable);
927
            }
928
            batchNum++;
×
929
        } else {
UNCOV
930
            if (0 == len) {
×
931
                batchNum = 0;
×
932
            }
933
            // generator
934
            if (w == 0) {
×
935
                if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
UNCOV
936
                    goto create_table_end;
×
937
                }
938
            }
NEW
939
            char tbName[TSDB_TABLE_NAME_LEN] = {0};
×
940
            len = generateChildTblName(len, pThreadInfo->buffer,
×
941
                                       database, stbInfo, i, tagData, w, ttl, tbName);
NEW
942
            if (stbInfo->useTagTableName) {                     
×
NEW
943
                pThreadInfo->childNames[j] = strdup(tbName);
×
944
            }
945
            // move next
UNCOV
946
            if (++w >= TAG_BATCH_COUNT) {
×
947
                // reset for gen again
UNCOV
948
                w = 0;
×
949
                index += TAG_BATCH_COUNT;
×
950
            }                           
951

UNCOV
952
            batchNum++;
×
UNCOV
953
            smallBatchCount++;
×
954

UNCOV
955
            int smallBatch = getBatchOfTblCreating(pThreadInfo, stbInfo);
×
UNCOV
956
            if ((!smallBatch || (smallBatchCount == smallBatch))
×
957
                    && (batchNum < stbInfo->batchTblCreatingNum)
×
UNCOV
958
                    && ((TSDB_MAX_ALLOWED_SQL_LEN - len) >=
×
959
                        (stbInfo->lenOfTags + EXTRA_SQL_LEN))) {
×
960
                continue;
×
961
            } else {
UNCOV
962
                smallBatchCount = 0;
×
963
            }
964
        }
965

UNCOV
966
        len = 0;
×
967

968
        int ret = 0;
×
969
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
×
970
                   pThreadInfo->buffer);
971
        // REST
972
        if (REST_IFACE == stbInfo->iface) {
×
UNCOV
973
            ret = queryDbExecRest(pThreadInfo->buffer,
×
974
                                  database->dbName,
975
                                  database->precision,
UNCOV
976
                                  stbInfo->iface,
×
977
                                  stbInfo->lineProtocol,
×
978
                                  stbInfo->tcpTransfer,
×
979
                                  pThreadInfo->sockfd);
980
        } else {
981
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
982
            int32_t trying = g_arguments->keep_trying;
×
UNCOV
983
            while (ret && trying) {
×
984
                infoPrint("will sleep %"PRIu32" milliseconds then re-create "
×
985
                          "table %s\n",
986
                          g_arguments->trying_interval, pThreadInfo->buffer);
987
                toolsMsleep(g_arguments->trying_interval);
×
988
                ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
989
                if (trying != -1) {
×
UNCOV
990
                    trying--;
×
991
                }
992
            }
993
        }
994

995
        if (0 != ret) {
×
UNCOV
996
            g_fail = true;
×
997
            goto create_table_end;
×
998
        }
UNCOV
999
        uint64_t intervalOfTblCreating = getIntervalOfTblCreating(pThreadInfo,
×
1000
                                                                  stbInfo);
1001
        if (intervalOfTblCreating) {
×
1002
            debugPrint("will sleep %"PRIu64" milliseconds "
×
1003
                       "for table creating interval\n", intervalOfTblCreating);
UNCOV
1004
            toolsMsleep(intervalOfTblCreating);
×
1005
        }
1006

1007
        pThreadInfo->tables_created += batchNum;
×
UNCOV
1008
        batchNum = 0;
×
UNCOV
1009
        uint64_t currentPrintTime = toolsGetTimestampMs();
×
1010
        if (currentPrintTime - lastPrintTime > PRINT_STAT_INTERVAL) {
×
1011
            float speed = (pThreadInfo->tables_created - lastTotalCreate) * 1000 / (currentPrintTime - lastPrintTime);
×
1012
            infoPrint("thread[%d] already created %" PRId64 " tables, peroid speed: %.0f tables/s\n",
×
1013
                       pThreadInfo->threadID, pThreadInfo->tables_created, speed);
UNCOV
1014
            lastPrintTime   = currentPrintTime;
×
UNCOV
1015
            lastTotalCreate = pThreadInfo->tables_created;
×
1016
        }
1017
    }
1018

1019
    if (0 != len) {
×
UNCOV
1020
        int ret = 0;
×
UNCOV
1021
        debugPrint("thread[%d] creating table: %s\n", pThreadInfo->threadID,
×
1022
                   pThreadInfo->buffer);
1023
        // REST
1024
        if (REST_IFACE == stbInfo->iface) {
×
1025
            ret = queryDbExecRest(pThreadInfo->buffer,
×
1026
                                  database->dbName,
1027
                                  database->precision,
1028
                                  stbInfo->iface,
×
UNCOV
1029
                                  stbInfo->lineProtocol,
×
1030
                                  stbInfo->tcpTransfer,
×
1031
                                  pThreadInfo->sockfd);
1032
        } else {
1033
            ret = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
1034
        }
UNCOV
1035
        if (0 != ret) {
×
1036
            g_fail = true;
×
1037
            goto create_table_end;
×
1038
        }
1039
        pThreadInfo->tables_created += batchNum;
×
1040
        debugPrint("thread[%d] already created %" PRId64 " tables\n",
×
1041
                   pThreadInfo->threadID, pThreadInfo->tables_created);
1042
    }
1043
create_table_end:
×
1044
    // free
UNCOV
1045
    tmfree(tagData);
×
UNCOV
1046
    tmfree(pThreadInfo->buffer);
×
UNCOV
1047
    pThreadInfo->buffer = NULL;
×
1048
    if(csvFile) {
×
1049
        fclose(csvFile);
×
1050
    }
UNCOV
1051
    return NULL;
×
1052
}
1053

1054
static int startMultiThreadCreateChildTable(SDataBase* database, SSuperTable* stbInfo) {
×
UNCOV
1055
    int32_t code    = -1;
×
UNCOV
1056
    int32_t threads = g_arguments->table_threads;
×
1057
    int64_t ntables;
1058

1059
    if (stbInfo->childTblTo > 0) {
×
1060
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
×
UNCOV
1061
    } else if(stbInfo->childTblFrom > 0) {
×
UNCOV
1062
        ntables = stbInfo->childTblCount - stbInfo->childTblFrom;
×
1063
    } else {
UNCOV
1064
        ntables = stbInfo->childTblCount;
×
1065
    }
1066
    pthread_t   *pids = benchCalloc(1, threads * sizeof(pthread_t), false);
×
1067
    threadInfo  *infos = benchCalloc(1, threads * sizeof(threadInfo), false);
×
UNCOV
1068
    uint64_t     tableFrom = stbInfo->childTblFrom;
×
1069
    if (threads < 1) {
×
1070
        threads = 1;
×
1071
    }
UNCOV
1072
    if (ntables == 0) {
×
1073
        code = 0;
×
UNCOV
1074
        infoPrint("child table is zero, no need create. childTblCount: %"PRId64"\n", ntables);
×
1075
        goto over;
×
1076
    }
1077

1078
    int64_t div = ntables / threads;
×
1079
    if (div < 1) {
×
UNCOV
1080
        threads = (int)ntables;
×
1081
        div = 1;
×
1082
    }
UNCOV
1083
    int64_t mod = ntables % threads;
×
1084

1085
    int threadCnt = 0;
×
1086
    for (uint32_t i = 0; (i < threads && !g_arguments->terminate); i++) {
×
UNCOV
1087
        threadInfo *pThreadInfo = infos + i;
×
UNCOV
1088
        pThreadInfo->threadID = i;
×
1089
        pThreadInfo->stbInfo = stbInfo;
×
1090
        pThreadInfo->dbInfo = database;
×
1091
        if (REST_IFACE == stbInfo->iface) {
×
1092
            int sockfd = createSockFd();
×
UNCOV
1093
            if (sockfd < 0) {
×
1094
                FREE_PIDS_INFOS_RETURN_MINUS_1();
×
1095
            }
1096
            pThreadInfo->sockfd = sockfd;
×
1097
        } else {
1098
            pThreadInfo->conn = initBenchConn();
×
1099
            if (NULL == pThreadInfo->conn) {
×
1100
                goto over;
×
1101
            }
1102
        }
1103
        pThreadInfo->start_table_from = tableFrom;
×
1104
        pThreadInfo->ntables          = i < mod ? div + 1 : div;
×
1105
        pThreadInfo->end_table_to     = i < mod ? tableFrom + div : tableFrom + div - 1;
×
UNCOV
1106
        tableFrom = pThreadInfo->end_table_to + 1;
×
UNCOV
1107
        pThreadInfo->tables_created = 0;
×
1108
        debugPrint("div table by thread. i=%d from=%"PRId64" to=%"PRId64" ntable=%"PRId64"\n", i, pThreadInfo->start_table_from,
×
1109
                                        pThreadInfo->end_table_to, pThreadInfo->ntables);
1110
        pthread_create(pids + i, NULL, createTable, pThreadInfo);
×
1111
        threadCnt ++;
×
1112
    }
1113

UNCOV
1114
    for (int i = 0; i < threadCnt; i++) {
×
1115
        pthread_join(pids[i], NULL);
×
1116
    }
1117

1118
    if (g_arguments->terminate)  toolsMsleep(100);
×
1119

NEW
1120
    int nCount = 0;
×
1121
    for (int i = 0; i < threadCnt; i++) {
×
1122
        threadInfo *pThreadInfo = infos + i;
×
1123
        g_arguments->actualChildTables += pThreadInfo->tables_created;
×
1124

1125
        if ((REST_IFACE != stbInfo->iface) && pThreadInfo->conn) {
×
UNCOV
1126
            closeBenchConn(pThreadInfo->conn);
×
1127
        }
1128
        
NEW
1129
        if (stbInfo->useTagTableName) {
×
NEW
1130
            for (int j = 0; j < pThreadInfo->childTblCount; j++) {
×
NEW
1131
                stbInfo->childTblArray[nCount++]->name = pThreadInfo->childNames[j];
×
1132
            }
NEW
1133
            tmfree(pThreadInfo->childNames);
×
1134
        }
1135
    }
1136

1137
    if (g_fail) {
×
1138
        goto over;
×
1139
    }
UNCOV
1140
    code = 0;
×
1141
over:
×
1142
    free(pids);
×
1143
    free(infos);
×
1144
    return code;
×
1145
}
1146

UNCOV
1147
static int createChildTables() {
×
1148
    int32_t    code;
1149
    infoPrint("start creating %" PRId64 " table(s) with %d thread(s)\n",
×
1150
              g_arguments->totalChildTables, g_arguments->table_threads);
UNCOV
1151
    if (g_arguments->fpOfInsertResult) {
×
1152
        infoPrintToFile(
×
1153
                  "start creating %" PRId64 " table(s) with %d thread(s)\n",
1154
                  g_arguments->totalChildTables, g_arguments->table_threads);
1155
    }
1156
    int64_t start = (double)toolsGetTimestampMs();
×
1157

1158
    for (int i = 0; (i < g_arguments->databases->size
×
1159
            && !g_arguments->terminate); i++) {
×
1160
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
×
1161
        if (database->superTbls) {
×
UNCOV
1162
            for (int j = 0; (j < database->superTbls->size
×
1163
                    && !g_arguments->terminate); j++) {
×
1164
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
×
UNCOV
1165
                if (stbInfo->autoTblCreating || stbInfo->iface == SML_IFACE
×
UNCOV
1166
                    || stbInfo->iface == SML_REST_IFACE) {
×
1167
                    g_arguments->autoCreatedChildTables +=
×
1168
                            stbInfo->childTblCount;
×
1169
                    continue;
×
1170
                }
1171
                if (stbInfo->childTblExists) {
×
UNCOV
1172
                    g_arguments->existedChildTables +=
×
UNCOV
1173
                            stbInfo->childTblCount;
×
UNCOV
1174
                    continue;
×
1175
                }
1176
                debugPrint("colsOfCreateChildTable: %s\n",
×
1177
                        stbInfo->colsOfCreateChildTable);
1178

1179
                code = startMultiThreadCreateChildTable(database, stbInfo);
×
1180
                if (code && !g_arguments->terminate) {
×
1181
                    return code;
×
1182
                }
1183
            }
1184
        }
1185
    }
1186

1187
    int64_t end = toolsGetTimestampMs();
×
UNCOV
1188
    if(end == start) {
×
1189
        end += 1;
×
1190
    }
UNCOV
1191
    succPrint(
×
1192
            "Spent %.4f seconds to create %" PRId64
1193
            " table(s) with %d thread(s) speed: %.0f tables/s, already exist %" PRId64
1194
            " table(s), actual %" PRId64 " table(s) pre created, %" PRId64
1195
            " table(s) will be auto created\n",
1196
            (float)(end - start) / 1000.0,
1197
            g_arguments->totalChildTables,
1198
            g_arguments->table_threads,
1199
            g_arguments->actualChildTables * 1000 / (float)(end - start),
1200
            g_arguments->existedChildTables,
1201
            g_arguments->actualChildTables,
1202
            g_arguments->autoCreatedChildTables);
1203
    return 0;
×
1204
}
1205

1206
static void freeChildTable(SChildTable *childTbl, int colsSize) {
×
1207
    if (childTbl->useOwnSample) {
×
UNCOV
1208
        if (childTbl->childCols) {
×
1209
            for (int col = 0; col < colsSize; col++) {
×
1210
                ChildField *childCol =
1211
                    benchArrayGet(childTbl->childCols, col);
×
1212
                if (childCol) {
×
UNCOV
1213
                    tmfree(childCol->stmtData.data);
×
1214
                    childCol->stmtData.data = NULL;
×
UNCOV
1215
                    tmfree(childCol->stmtData.is_null);
×
UNCOV
1216
                    childCol->stmtData.is_null = NULL;
×
1217
                    tmfree(childCol->stmtData.lengths);
×
1218
                    childCol->stmtData.lengths = NULL;
×
1219
                }
1220
            }
UNCOV
1221
            benchArrayDestroy(childTbl->childCols);
×
1222
        }
UNCOV
1223
        tmfree(childTbl->sampleDataBuf);
×
1224
    }
1225
    tmfree(childTbl);
×
1226
}
×
1227

UNCOV
1228
void postFreeResource() {
×
1229
    infoPrint("%s\n", "free resource and exit ...");
×
UNCOV
1230
    if (!g_arguments->terminate) {
×
UNCOV
1231
        tmfclose(g_arguments->fpOfInsertResult);
×
1232
    }
1233

UNCOV
1234
    for (int i = 0; i < g_arguments->databases->size; i++) {
×
UNCOV
1235
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
×
UNCOV
1236
        if (database->cfgs) {
×
UNCOV
1237
            for (int c = 0; c < database->cfgs->size; c++) {
×
UNCOV
1238
                SDbCfg *cfg = benchArrayGet(database->cfgs, c);
×
UNCOV
1239
                if (cfg->valuestring && cfg->free) {
×
UNCOV
1240
                    tmfree(cfg->valuestring);
×
1241
                    cfg->valuestring = NULL;
×
1242
                }
1243
            }
1244
            benchArrayDestroy(database->cfgs);
×
1245
        }
1246
        if (database->superTbls) {
×
1247
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
×
UNCOV
1248
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
×
1249
                tmfree(stbInfo->colsOfCreateChildTable);
×
1250
                stbInfo->colsOfCreateChildTable = NULL;
×
1251
                tmfree(stbInfo->sampleDataBuf);
×
1252
                stbInfo->sampleDataBuf = NULL;
×
1253
                tmfree(stbInfo->partialColNameBuf);
×
1254
                stbInfo->partialColNameBuf = NULL;
×
1255
                benchArrayDestroy(stbInfo->batchTblCreatingNumbersArray);
×
1256
                benchArrayDestroy(stbInfo->batchTblCreatingIntervalsArray);
×
UNCOV
1257
                for (int k = 0; k < stbInfo->tags->size; k++) {
×
UNCOV
1258
                    Field * tag = benchArrayGet(stbInfo->tags, k);
×
1259
                    tmfree(tag->stmtData.data);
×
UNCOV
1260
                    tag->stmtData.data = NULL;
×
1261
                    tmfree(tag->stmtData.is_null);
×
UNCOV
1262
                    tag->stmtData.is_null = NULL;
×
1263
                    tmfree(tag->stmtData.lengths);
×
1264
                    tag->stmtData.lengths = NULL;
×
1265
                }
1266
                benchArrayDestroy(stbInfo->tags);
×
1267

1268
                for (int k = 0; k < stbInfo->cols->size; k++) {
×
1269
                    Field * col = benchArrayGet(stbInfo->cols, k);
×
UNCOV
1270
                    tmfree(col->stmtData.data);
×
UNCOV
1271
                    col->stmtData.data = NULL;
×
1272
                    tmfree(col->stmtData.is_null);
×
1273
                    col->stmtData.is_null = NULL;
×
1274
                    tmfree(col->stmtData.lengths);
×
1275
                    col->stmtData.lengths = NULL;
×
1276
                }
1277
                if (g_arguments->test_mode == INSERT_TEST) {
×
1278
                    if (stbInfo->childTblArray) {
×
1279
                        for (int64_t child = 0; child < stbInfo->childTblCount;
×
UNCOV
1280
                                child++) {
×
UNCOV
1281
                            SChildTable *childTbl = stbInfo->childTblArray[child];
×
1282
                            if (childTbl) {
×
UNCOV
1283
                                tmfree(childTbl->name);
×
1284
                                freeChildTable(childTbl, stbInfo->cols->size);
×
1285
                            }
1286
                        }
1287
                    }
1288
                }
1289
                benchArrayDestroy(stbInfo->cols);
×
1290
                tmfree(stbInfo->childTblArray);
×
1291
                stbInfo->childTblArray = NULL;
×
1292
                benchArrayDestroy(stbInfo->tsmas);
×
1293

1294
                // free sqls
1295
                if(stbInfo->sqls) {
×
1296
                    char **sqls = stbInfo->sqls;
×
1297
                    while (*sqls) {
×
1298
                        free(*sqls);
×
1299
                        sqls++;
×
1300
                    }
1301
                    tmfree(stbInfo->sqls);
×
1302
                }
1303

1304
                // thread_bind
UNCOV
1305
                if (database->vgArray) {
×
1306
                    for (int32_t v = 0; v < database->vgroups; v++) {
×
1307
                        SVGroup *vg = benchArrayGet(database->vgArray, v);
×
1308
                        tmfree(vg->childTblArray);
×
1309
                        vg->childTblArray = NULL;
×
1310
                    }
1311
                    benchArrayDestroy(database->vgArray);
×
1312
                    database->vgArray = NULL;
×
1313
                }
1314
            }
1315
            benchArrayDestroy(database->superTbls);
×
1316
        }
1317
    }
1318
    benchArrayDestroy(g_arguments->databases);
×
1319
    benchArrayDestroy(g_arguments->streams);
×
1320
    tools_cJSON_Delete(root);
×
1321
}
×
1322

UNCOV
1323
int32_t execInsert(threadInfo *pThreadInfo, uint32_t k, int64_t *delay3) {
×
UNCOV
1324
    SDataBase *  database = pThreadInfo->dbInfo;
×
UNCOV
1325
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
UNCOV
1326
    TAOS_RES *   res = NULL;
×
1327
    int32_t      code = 0;
×
1328
    uint16_t     iface = stbInfo->iface;
×
1329
    int64_t      start = 0;
×
1330
    int32_t      affectRows = 0;
×
1331

UNCOV
1332
    int32_t trying = (stbInfo->keep_trying)?
×
1333
        stbInfo->keep_trying:g_arguments->keep_trying;
×
1334
    int32_t trying_interval = stbInfo->trying_interval?
×
1335
        stbInfo->trying_interval:g_arguments->trying_interval;
×
1336
    int protocol = stbInfo->lineProtocol;
×
1337

UNCOV
1338
    switch (iface) {
×
1339
        case TAOSC_IFACE:
×
UNCOV
1340
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
×
UNCOV
1341
            code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
UNCOV
1342
            while (code && trying && !g_arguments->terminate) {
×
1343
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1344
                          trying_interval);
1345
                toolsMsleep(trying_interval);
×
1346
                code = queryDbExecCall(pThreadInfo->conn, pThreadInfo->buffer);
×
1347
                if (trying != -1) {
×
UNCOV
1348
                    trying--;
×
1349
                }
1350
            }
UNCOV
1351
            break;
×
1352
        // REST
1353
        case REST_IFACE:
×
UNCOV
1354
            debugPrint("buffer: %s\n", pThreadInfo->buffer);
×
UNCOV
1355
            code = postProcessSql(pThreadInfo->buffer,
×
1356
                                database->dbName,
1357
                                database->precision,
1358
                                stbInfo->iface,
×
1359
                                stbInfo->lineProtocol,
×
UNCOV
1360
                                g_arguments->port,
×
1361
                                stbInfo->tcpTransfer,
×
1362
                                pThreadInfo->sockfd,
1363
                                pThreadInfo->filePath);
×
1364
            while (code && trying && !g_arguments->terminate) {
×
1365
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1366
                          trying_interval);
1367
                toolsMsleep(trying_interval);
×
1368
                code = postProcessSql(pThreadInfo->buffer,
×
1369
                                    database->dbName,
1370
                                    database->precision,
1371
                                    stbInfo->iface,
×
1372
                                    stbInfo->lineProtocol,
×
1373
                                    g_arguments->port,
×
1374
                                    stbInfo->tcpTransfer,
×
1375
                                    pThreadInfo->sockfd,
1376
                                    pThreadInfo->filePath);
×
1377
                if (trying != -1) {
×
1378
                    trying--;
×
1379
                }
1380
            }
1381
            break;
×
1382
            
1383
        case STMT_IFACE:
×
1384
            // add batch
1385
            if(!stbInfo->autoTblCreating) {
×
1386
                start = toolsGetTimestampUs();
×
UNCOV
1387
                if (taos_stmt_add_batch(pThreadInfo->conn->stmt) != 0) {
×
UNCOV
1388
                    errorPrint("taos_stmt_add_batch() failed! reason: %s\n",
×
1389
                            taos_stmt_errstr(pThreadInfo->conn->stmt));
UNCOV
1390
                    return -1;
×
1391
                }
1392
                if(delay3) {
×
1393
                    *delay3 += toolsGetTimestampUs() - start;
×
1394
                }
1395
            }
1396
            
1397
            // execute 
1398
            code = taos_stmt_execute(pThreadInfo->conn->stmt);
×
1399
            if (code) {
×
UNCOV
1400
                errorPrint(
×
1401
                           "failed to execute insert statement. reason: %s\n",
1402
                           taos_stmt_errstr(pThreadInfo->conn->stmt));
1403
                code = -1;
×
1404
            }
1405
            break;
×
1406

UNCOV
1407
        case STMT2_IFACE:
×
1408
            // execute 
1409
            code = taos_stmt2_exec(pThreadInfo->conn->stmt2, &affectRows);
×
1410
            if (code) {
×
1411
                errorPrint( "failed to call taos_stmt2_exec(). reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
×
1412
                code = -1;
×
1413
            }
1414
            debugPrint( "succ call taos_stmt2_exec() affectRows:%d\n", affectRows);
×
1415
            break;
×
1416

UNCOV
1417
        case SML_IFACE:
×
UNCOV
1418
            res = taos_schemaless_insert(
×
1419
                pThreadInfo->conn->taos, pThreadInfo->lines,
×
1420
                (TSDB_SML_JSON_PROTOCOL == protocol
1421
                    || SML_JSON_TAOS_FORMAT == protocol)
×
1422
                    ? 0 : k,
1423
                (SML_JSON_TAOS_FORMAT == protocol)
1424
                    ? TSDB_SML_JSON_PROTOCOL : protocol,
1425
                (TSDB_SML_LINE_PROTOCOL == protocol)
1426
                    ? database->sml_precision
1427
                    : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1428
            code = taos_errno(res);
×
UNCOV
1429
            trying = stbInfo->keep_trying;
×
1430
            while (code && trying && !g_arguments->terminate) {
×
1431
                taos_free_result(res);
×
UNCOV
1432
                infoPrint("will sleep %"PRIu32" milliseconds then re-insert\n",
×
1433
                          trying_interval);
UNCOV
1434
                toolsMsleep(trying_interval);
×
UNCOV
1435
                res = taos_schemaless_insert(
×
1436
                        pThreadInfo->conn->taos, pThreadInfo->lines,
×
1437
                        (TSDB_SML_JSON_PROTOCOL == protocol
1438
                            || SML_JSON_TAOS_FORMAT == protocol)
×
1439
                            ? 0 : k,
1440
                        (SML_JSON_TAOS_FORMAT == protocol)
1441
                            ? TSDB_SML_JSON_PROTOCOL : protocol,
1442
                        (TSDB_SML_LINE_PROTOCOL == protocol)
1443
                            ? database->sml_precision
1444
                            : TSDB_SML_TIMESTAMP_NOT_CONFIGURED);
1445
                code = taos_errno(res);
×
UNCOV
1446
                if (trying != -1) {
×
1447
                    trying--;
×
1448
                }
1449
            }
1450

UNCOV
1451
            if (code != TSDB_CODE_SUCCESS && !g_arguments->terminate) {
×
1452
                debugPrint("Failed to execute "
×
1453
                           "schemaless insert content: %s\n\n",
1454
                        pThreadInfo->lines?(pThreadInfo->lines[0]?
1455
                            pThreadInfo->lines[0]:""):"");
1456
                errorPrint(
×
1457
                    "failed to execute schemaless insert. "
1458
                        "code: 0x%08x reason: %s\n\n",
1459
                        code, taos_errstr(res));
1460
            }
UNCOV
1461
            taos_free_result(res);
×
UNCOV
1462
            break;
×
UNCOV
1463
        case SML_REST_IFACE: {
×
UNCOV
1464
            if (TSDB_SML_JSON_PROTOCOL == protocol
×
UNCOV
1465
                    || SML_JSON_TAOS_FORMAT == protocol) {
×
1466
                code = postProcessSql(pThreadInfo->lines[0], database->dbName,
×
1467
                                    database->precision, stbInfo->iface,
×
1468
                                    protocol, g_arguments->port,
×
1469
                                    stbInfo->tcpTransfer,
×
1470
                                    pThreadInfo->sockfd, pThreadInfo->filePath);
×
1471
            } else {
1472
                int len = 0;
×
1473
                for (int i = 0; i < k; i++) {
×
1474
                    if (strlen(pThreadInfo->lines[i]) != 0) {
×
1475
                        int n;
1476
                        if (TSDB_SML_TELNET_PROTOCOL == protocol
×
UNCOV
1477
                                && stbInfo->tcpTransfer) {
×
UNCOV
1478
                            n = snprintf(pThreadInfo->buffer + len,
×
UNCOV
1479
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
×
UNCOV
1480
                                           "put %s\n", pThreadInfo->lines[i]);
×
1481
                        } else {
UNCOV
1482
                            n = snprintf(pThreadInfo->buffer + len,
×
1483
                                            TSDB_MAX_ALLOWED_SQL_LEN - len,
×
1484
                                            "%s\n",
1485
                                           pThreadInfo->lines[i]);
×
1486
                        }
UNCOV
1487
                        if (n < 0 || n >= TSDB_MAX_ALLOWED_SQL_LEN - len) {
×
UNCOV
1488
                            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
1489
                                __func__, __LINE__, i);
1490
                            break;
×
1491
                        } else {
UNCOV
1492
                            len += n;
×
1493
                        }
1494
                    } else {
UNCOV
1495
                        break;
×
1496
                    }
1497
                }
UNCOV
1498
                if (g_arguments->terminate) {
×
1499
                    break;
×
1500
                }
1501
                code = postProcessSql(pThreadInfo->buffer, database->dbName,
×
1502
                        database->precision,
1503
                        stbInfo->iface, protocol,
×
1504
                        g_arguments->port,
×
1505
                        stbInfo->tcpTransfer,
×
1506
                        pThreadInfo->sockfd, pThreadInfo->filePath);
×
1507
            }
1508
            break;
×
1509
        }
1510
    }
1511
    return code;
×
1512
}
1513

1514
static int smartContinueIfFail(threadInfo *pThreadInfo,
×
1515
                               SChildTable *childTbl,
1516
                               char *tagData,
1517
                               int64_t i,
1518
                               char *ttl) {
UNCOV
1519
    SDataBase *  database = pThreadInfo->dbInfo;
×
1520
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
1521
    char *buffer =
UNCOV
1522
        benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
×
1523
    snprintf(
×
1524
            buffer, TSDB_MAX_ALLOWED_SQL_LEN,
1525
            g_arguments->escape_character ?
×
1526
                "CREATE TABLE IF NOT EXISTS `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s "
1527
                : "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (%s) %s ",
1528
            database->dbName, childTbl->name, database->dbName,
1529
            stbInfo->stbName,
1530
            tagData + i * stbInfo->lenOfTags, ttl);
×
UNCOV
1531
    debugPrint("creating table: %s\n", buffer);
×
1532

1533
    int ret;
UNCOV
1534
    if (REST_IFACE == stbInfo->iface) {
×
UNCOV
1535
        ret = queryDbExecRest(buffer,
×
1536
                              database->dbName,
1537
                              database->precision,
UNCOV
1538
                              stbInfo->iface,
×
1539
                              stbInfo->lineProtocol,
×
UNCOV
1540
                              stbInfo->tcpTransfer,
×
1541
                              pThreadInfo->sockfd);
1542
    } else {
1543
        ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
1544
        int32_t trying = g_arguments->keep_trying;
×
UNCOV
1545
        while (ret && trying) {
×
1546
            infoPrint("will sleep %"PRIu32" milliseconds then "
×
1547
                      "re-create table %s\n",
1548
                      g_arguments->trying_interval, buffer);
1549
            toolsMsleep(g_arguments->trying_interval);
×
UNCOV
1550
            ret = queryDbExecCall(pThreadInfo->conn, buffer);
×
UNCOV
1551
            if (trying != -1) {
×
1552
                trying--;
×
1553
            }
1554
        }
1555
    }
1556

1557
    tmfree(buffer);
×
1558

UNCOV
1559
    return ret;
×
1560
}
1561

UNCOV
1562
static void cleanupAndPrint(threadInfo *pThreadInfo, char *mode) {
×
1563
    if (pThreadInfo) {
×
UNCOV
1564
        if (pThreadInfo->json_array) {
×
UNCOV
1565
            tools_cJSON_Delete(pThreadInfo->json_array);
×
UNCOV
1566
            pThreadInfo->json_array = NULL;
×
1567
        }
1568
        if (0 == pThreadInfo->totalDelay) {
×
1569
            pThreadInfo->totalDelay = 1;
×
1570
        }
UNCOV
1571
        succPrint(
×
1572
            "thread[%d] %s mode, completed total inserted rows: %" PRIu64
1573
            ", %.2f records/second\n",
1574
            pThreadInfo->threadID,
1575
            mode,
1576
            pThreadInfo->totalInsertRows,
1577
            (double)(pThreadInfo->totalInsertRows /
1578
            ((double)pThreadInfo->totalDelay / 1E6)));
1579
    }
UNCOV
1580
}
×
1581

1582
static int64_t getDisorderTs(SSuperTable *stbInfo, int *disorderRange) {
×
1583
    int64_t disorderTs = 0;
×
1584
    int64_t startTimestamp = stbInfo->startTimestamp;
×
UNCOV
1585
    if (stbInfo->disorderRatio > 0) {
×
UNCOV
1586
        int rand_num = taosRandom() % 100;
×
1587
        if (rand_num < stbInfo->disorderRatio) {
×
1588
            (*disorderRange)--;
×
1589
            if (0 == *disorderRange) {
×
1590
                *disorderRange = stbInfo->disorderRange;
×
1591
            }
UNCOV
1592
            disorderTs = startTimestamp - *disorderRange;
×
UNCOV
1593
            debugPrint("rand_num: %d, < disorderRatio: %d, "
×
1594
                       "disorderTs: %"PRId64"\n",
1595
                       rand_num, stbInfo->disorderRatio,
1596
                       disorderTs);
1597
        }
1598
    }
UNCOV
1599
    return disorderTs;
×
1600
}
1601

1602
void loadChildTableInfo(threadInfo* pThreadInfo) {
×
1603
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
1604
    if(!g_arguments->pre_load_tb_meta) {
×
UNCOV
1605
        return ;
×
1606
    }
1607
    if(pThreadInfo->conn == NULL) {
×
UNCOV
1608
        return ;
×
1609
    }
1610

UNCOV
1611
    char *db    = pThreadInfo->dbInfo->dbName;
×
UNCOV
1612
    int64_t cnt = pThreadInfo->end_table_to - pThreadInfo->start_table_from;
×
1613

1614
    // 100k
UNCOV
1615
    int   bufLen = 100 * 1024;
×
UNCOV
1616
    char *buf    = benchCalloc(1, bufLen, false);
×
UNCOV
1617
    int   pos    = 0;
×
1618
    infoPrint("start load child tables(%"PRId64") info...\n", cnt);
×
UNCOV
1619
    int64_t start = toolsGetTimestampUs();
×
1620
    for(int64_t i = pThreadInfo->start_table_from; i < pThreadInfo->end_table_to; i++) {
×
1621
        SChildTable *childTbl = stbInfo->childTblArray[i];
×
1622
        pos += sprintf(buf + pos, ",%s.%s", db, childTbl->name);
×
1623

1624
        if(pos >= bufLen - 256 || i + 1 == pThreadInfo->end_table_to) {
×
1625
            taos_load_table_info(pThreadInfo->conn, buf);
×
1626
            pos = 0;
×
1627
        }
1628
    }
UNCOV
1629
    int64_t delay = toolsGetTimestampUs() - start;
×
1630
    infoPrint("end load child tables info. delay=%.2fs\n", delay/1E6);
×
1631
    pThreadInfo->totalDelay += delay;
×
1632

UNCOV
1633
    tmfree(buf);
×
1634
}
1635

1636
// create conn again
1637
int32_t reCreateConn(threadInfo * pThreadInfo) {
×
1638
    // single
UNCOV
1639
    bool single = true;
×
1640
    if (pThreadInfo->dbInfo->superTbls->size > 1) {
×
1641
        single = false;
×
1642
    }
1643

1644
    //
1645
    // retry stmt2 init 
1646
    //
1647

1648
    // stmt2 close
1649
    if (pThreadInfo->conn->stmt2) {
×
1650
        taos_stmt2_close(pThreadInfo->conn->stmt2);
×
UNCOV
1651
        pThreadInfo->conn->stmt2 = NULL;
×
1652
    }
1653

1654
    // retry stmt2 init , maybe success
1655
    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
1656
    if (pThreadInfo->conn->stmt2) {
×
1657
        succPrint("%s", "reCreateConn first taos_stmt2_init() success and return.\n");
×
1658
        return 0;
×
1659
    }
1660

1661
    //
1662
    // close old
1663
    //
1664
    closeBenchConn(pThreadInfo->conn);
×
UNCOV
1665
    pThreadInfo->conn = NULL;
×
1666

1667
    //
1668
    // create new
1669
    //
1670

1671
    // conn
UNCOV
1672
    pThreadInfo->conn = initBenchConn();
×
UNCOV
1673
    if (pThreadInfo->conn == NULL) {
×
UNCOV
1674
        errorPrint("%s", "reCreateConn initBenchConn failed.");
×
1675
        return -1;
×
1676
    }
1677
    // stmt2
1678
    pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
1679
    if (NULL == pThreadInfo->conn->stmt2) {
×
UNCOV
1680
        errorPrint("reCreateConn taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
×
UNCOV
1681
        return -1;
×
1682
    } 
1683
        
UNCOV
1684
    succPrint("%s", "reCreateConn second taos_stmt2_init() success.\n");
×
1685
    // select db 
UNCOV
1686
    if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
×
1687
        errorPrint("second taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
1688
        return -1;
×
1689
    }
1690

UNCOV
1691
    return 0;
×
1692
}
1693

1694
// reinit
1695
int32_t reConnectStmt2(threadInfo * pThreadInfo, int32_t w) {
×
1696
    // re-create connection
UNCOV
1697
    int32_t code = reCreateConn(pThreadInfo);
×
UNCOV
1698
    if (code != 0) {
×
UNCOV
1699
        return code;
×
1700
    }
1701

1702
    // prepare
1703
    code = prepareStmt2(pThreadInfo->conn->stmt2, pThreadInfo->stbInfo, NULL, w, pThreadInfo->dbInfo->dbName);
×
UNCOV
1704
    if (code != 0) {
×
UNCOV
1705
        return code;
×
1706
    }
1707

UNCOV
1708
    return code;
×
1709
}
1710

1711
int32_t submitStmt2Impl(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
×
1712
                    int64_t* startTs, int64_t* endTs, uint32_t* generated) {
1713
    // call bind
UNCOV
1714
    int64_t start = toolsGetTimestampUs();
×
UNCOV
1715
    int32_t code = taos_stmt2_bind_param(pThreadInfo->conn->stmt2, bindv, -1);
×
1716
    if (code != 0) {
×
1717
        errorPrint("taos_stmt2_bind_param failed, reason: %s\n", taos_stmt2_error(pThreadInfo->conn->stmt2));
×
1718
        return code;
×
1719
    }
UNCOV
1720
    debugPrint("interlace taos_stmt2_bind_param() ok.  bindv->count=%d \n", bindv->count);
×
UNCOV
1721
    *delay1 += toolsGetTimestampUs() - start;
×
1722

1723
    // execute
1724
    *startTs = toolsGetTimestampUs();
×
1725
    code = execInsert(pThreadInfo, *generated, delay3);
×
1726
    *endTs = toolsGetTimestampUs();
×
UNCOV
1727
    return code;
×
1728
}
1729

UNCOV
1730
int32_t submitStmt2(threadInfo * pThreadInfo, TAOS_STMT2_BINDV *bindv, int64_t *delay1, int64_t *delay3,
×
1731
                    int64_t* startTs, int64_t* endTs, uint32_t* generated, int32_t w) {
1732
    // calc loop
1733
    int32_t loop = 1;
×
UNCOV
1734
    SSuperTable* stbInfo = pThreadInfo->stbInfo;
×
1735
    if(stbInfo->continueIfFail == YES_IF_FAILED) {          
×
1736
        if(stbInfo->keep_trying > 1) {
×
1737
            loop = stbInfo->keep_trying;
×
1738
        } else {
UNCOV
1739
            loop = 3; // default
×
1740
        }
1741
    }
1742

1743
    // submit stmt2
UNCOV
1744
    int32_t i = 0;
×
UNCOV
1745
    bool connected = true;
×
1746
    while (1) {
×
UNCOV
1747
        int32_t code = -1;
×
UNCOV
1748
        if(connected) {
×
1749
            // reinit success to do submit
UNCOV
1750
            code = submitStmt2Impl(pThreadInfo, bindv, delay1, delay3, startTs, endTs, generated);
×
1751
        }
1752

1753
        // check code
1754
        if ( code == 0) {
×
1755
            // success
1756
            break;
×
1757
        } else {
1758
            // failed to try
1759
            if (--loop == 0) {
×
1760
                // failed finally
UNCOV
1761
                char tip[64] = "";
×
1762
                if (i > 0) {
×
1763
                    snprintf(tip, sizeof(tip), " after retry %d", i);
×
1764
                }
1765
                errorPrint("finally faild execute submitStmt2()%s\n", tip);
×
UNCOV
1766
                return -1;
×
1767
            }
1768

1769
            // wait a memont for trying
UNCOV
1770
            toolsMsleep(stbInfo->trying_interval);
×
1771
            // reinit
1772
            infoPrint("stmt2 start retry submit i=%d  after sleep %d ms...\n", i++, stbInfo->trying_interval);
×
1773
            code = reConnectStmt2(pThreadInfo, w);
×
1774
            if (code != 0) {
×
1775
                // faild and try again
UNCOV
1776
                errorPrint("faild reConnectStmt2 and retry again for next i=%d \n", i);
×
1777
                connected = false;
×
1778
            } else {
1779
                // succ 
UNCOV
1780
                connected = true;
×
1781
            }
1782
        }
1783
    }
1784
    
1785
    // success
1786
    return 0;
×
1787
}
1788

UNCOV
1789
static void *syncWriteInterlace(void *sarg) {
×
UNCOV
1790
    threadInfo * pThreadInfo = (threadInfo *)sarg;
×
UNCOV
1791
    SDataBase *  database = pThreadInfo->dbInfo;
×
1792
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
UNCOV
1793
    infoPrint(
×
1794
              "thread[%d] start interlace inserting into table from "
1795
              "%" PRIu64 " to %" PRIu64 "\n",
1796
              pThreadInfo->threadID, pThreadInfo->start_table_from,
1797
              pThreadInfo->end_table_to);
1798

1799
    int64_t insertRows = stbInfo->insertRows;
×
1800
    int32_t interlaceRows = stbInfo->interlaceRows;
×
1801
    uint32_t nBatchTable  = g_arguments->reqPerReq / interlaceRows;
×
UNCOV
1802
    uint64_t   lastPrintTime = toolsGetTimestampMs();
×
1803
    uint64_t   lastTotalInsertRows = 0;
×
1804
    int64_t   startTs = toolsGetTimestampUs();
×
1805
    int64_t   endTs;
UNCOV
1806
    uint64_t   tableSeq = pThreadInfo->start_table_from;
×
UNCOV
1807
    int disorderRange = stbInfo->disorderRange;
×
1808
    int32_t i = 0;
×
1809

1810
    loadChildTableInfo(pThreadInfo);
×
1811
    // check if filling back mode
1812
    bool fillBack = false;
×
UNCOV
1813
    if(stbInfo->useNow && stbInfo->startFillbackTime) {
×
1814
        fillBack = true;
×
1815
        pThreadInfo->start_time = stbInfo->startFillbackTime;
×
UNCOV
1816
        infoPrint("start time change to startFillbackTime = %"PRId64" \n", pThreadInfo->start_time);
×
1817
    }
1818

UNCOV
1819
    FILE* csvFile = NULL;
×
UNCOV
1820
    char* tagData = NULL;
×
UNCOV
1821
    int   w       = 0; // record tags position, if w > TAG_BATCH_COUNT , need recreate new tag values
×
UNCOV
1822
    if (stbInfo->autoTblCreating) {
×
UNCOV
1823
        csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
×
1824
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
×
1825
    }
UNCOV
1826
    int64_t delay1 = 0;
×
1827
    int64_t delay2 = 0;
×
1828
    int64_t delay3 = 0;
×
1829
    bool    firstInsertTb = true;
×
1830

1831
    TAOS_STMT2_BINDV *bindv = NULL;
×
1832

1833
    // create bindv
UNCOV
1834
    if(stbInfo->iface == STMT2_IFACE) {
×
UNCOV
1835
        int32_t tagCnt = stbInfo->autoTblCreating ? stbInfo->tags->size : 0;
×
UNCOV
1836
        if (csvFile) {
×
1837
            tagCnt = 0;
×
1838
        }
1839
        //int32_t tagCnt = stbInfo->tags->size;
1840
        bindv = createBindV(nBatchTable,  tagCnt, stbInfo->cols->size + 1);
×
1841
    }
1842

UNCOV
1843
    bool oldInitStmt = stbInfo->autoTblCreating;
×
1844
    // not auto create table call once
1845
    if(stbInfo->iface == STMT_IFACE && !oldInitStmt) {
×
1846
        debugPrint("call prepareStmt for stable:%s\n", stbInfo->stbName);
×
UNCOV
1847
        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
×
1848
            g_fail = true;
×
UNCOV
1849
            goto free_of_interlace;
×
1850
        }
1851
    }
1852
    else if (stbInfo->iface == STMT2_IFACE) {
×
1853
        // only prepare once
1854
        if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, NULL, w, database->dbName)) {
×
UNCOV
1855
            g_fail = true;
×
UNCOV
1856
            goto free_of_interlace;
×
1857
        }
1858
    }    
1859
    int64_t index = tableSeq;
×
1860
    while (insertRows > 0) {
×
1861
        int64_t tmp_total_insert_rows = 0;
×
1862
        uint32_t generated = 0;
×
UNCOV
1863
        if (insertRows <= interlaceRows) {
×
1864
            interlaceRows = insertRows;
×
1865
        }
1866

1867
        // loop each table
UNCOV
1868
        for (i = 0; i < nBatchTable; i++) {
×
1869
            if (g_arguments->terminate) {
×
UNCOV
1870
                goto free_of_interlace;
×
1871
            }
1872
            int64_t pos = pThreadInfo->pos;
×
1873
            
1874
            // get childTable
1875
            SChildTable *childTbl;
UNCOV
1876
            if (g_arguments->bind_vgroup) {
×
UNCOV
1877
                childTbl = pThreadInfo->vg->childTblArray[tableSeq];
×
1878
            } else {
UNCOV
1879
                childTbl = stbInfo->childTblArray[tableSeq];
×
1880
            }
1881
            
NEW
1882
            char*  tableName   = childTbl->name;
×
1883
            char *sampleDataBuf = childTbl->useOwnSample?
×
1884
                                        childTbl->sampleDataBuf:
×
1885
                                        stbInfo->sampleDataBuf;
1886
            // init ts
1887
            if(childTbl->ts == 0) {
×
UNCOV
1888
               childTbl->ts = pThreadInfo->start_time;
×
1889
            }
1890
            char ttl[SMALL_BUFF_LEN] = "";
×
UNCOV
1891
            if (stbInfo->ttl != 0) {
×
1892
                snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
×
1893
            }
1894
            switch (stbInfo->iface) {
×
UNCOV
1895
                case REST_IFACE:
×
1896
                case TAOSC_IFACE: {
1897
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
×
1898
                    if (g_arguments->escape_character) {
×
1899
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "`%s`",
×
1900
                                tableName);
1901
                    } else {
1902
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2, "%s",
×
1903
                                tableName);
1904
                    }
UNCOV
1905
                    if (i == 0) {
×
1906
                        ds_add_str(&pThreadInfo->buffer, STR_INSERT_INTO);
×
1907
                    }
1908

1909
                    // generator
1910
                    if (stbInfo->autoTblCreating && w == 0) {
×
UNCOV
1911
                        if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
UNCOV
1912
                            goto free_of_interlace;
×
1913
                        }
1914
                    }
1915

1916
                    // create child table
1917
                    if (stbInfo->partialColNum == stbInfo->cols->size) {
×
UNCOV
1918
                        if (stbInfo->autoTblCreating) {
×
UNCOV
1919
                            ds_add_strs(&pThreadInfo->buffer, 8,
×
1920
                                    escapedTbName,
1921
                                    " USING `",
1922
                                    stbInfo->stbName,
1923
                                    "` TAGS (",
UNCOV
1924
                                    tagData + stbInfo->lenOfTags * w,
×
1925
                                    ") ", ttl, " VALUES ");
1926
                        } else {
UNCOV
1927
                            ds_add_strs(&pThreadInfo->buffer, 2,
×
1928
                                    escapedTbName, " VALUES ");
1929
                        }
1930
                    } else {
UNCOV
1931
                        if (stbInfo->autoTblCreating) {
×
1932
                            ds_add_strs(&pThreadInfo->buffer, 10,
×
1933
                                        escapedTbName,
1934
                                        " (",
1935
                                        stbInfo->partialColNameBuf,
1936
                                        ") USING `",
1937
                                        stbInfo->stbName,
1938
                                        "` TAGS (",
UNCOV
1939
                                        tagData + stbInfo->lenOfTags * w,
×
1940
                                        ") ", ttl, " VALUES ");
1941
                        } else {
UNCOV
1942
                            ds_add_strs(&pThreadInfo->buffer, 4,
×
1943
                                        escapedTbName,
1944
                                        "(",
1945
                                        stbInfo->partialColNameBuf,
1946
                                        ") VALUES ");
1947
                        }
1948
                    }
1949

1950
                    // move next
UNCOV
1951
                    if (stbInfo->autoTblCreating && ++w >= TAG_BATCH_COUNT) {
×
1952
                        // reset for gen again
UNCOV
1953
                        w = 0;
×
UNCOV
1954
                        index += TAG_BATCH_COUNT;
×
1955
                    }  
1956

1957
                    // write child data with interlaceRows
UNCOV
1958
                    for (int64_t j = 0; j < interlaceRows; j++) {
×
UNCOV
1959
                        int64_t disorderTs = getDisorderTs(stbInfo,
×
1960
                                &disorderRange);
1961

1962
                        // change fillBack mode with condition
UNCOV
1963
                        if(fillBack) {
×
UNCOV
1964
                            int64_t tsnow = toolsGetTimestamp(database->precision);
×
1965
                            if(childTbl->ts >= tsnow){
×
UNCOV
1966
                                fillBack = false;
×
UNCOV
1967
                                infoPrint("fillBack mode set end. because timestamp(%"PRId64") >= now(%"PRId64")\n", childTbl->ts, tsnow);
×
1968
                            }
1969
                        }
1970

1971
                        // timestamp         
1972
                        char time_string[BIGINT_BUFF_LEN];
UNCOV
1973
                        if(stbInfo->useNow && stbInfo->interlaceRows == 1 && !fillBack) {
×
UNCOV
1974
                            int64_t now = toolsGetTimestamp(database->precision);
×
UNCOV
1975
                            snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"", now);
×
1976
                        } else {
1977
                            snprintf(time_string, BIGINT_BUFF_LEN, "%"PRId64"",
×
1978
                                    disorderTs?disorderTs:childTbl->ts);
1979
                        }
1980

1981
                        // combine rows timestamp | other cols = sampleDataBuf[pos]
UNCOV
1982
                        if(stbInfo->useSampleTs) {
×
UNCOV
1983
                            ds_add_strs(&pThreadInfo->buffer, 3, "(", 
×
UNCOV
1984
                                        sampleDataBuf + pos * stbInfo->lenOfCols, ") ");
×
1985
                        } else {
UNCOV
1986
                            ds_add_strs(&pThreadInfo->buffer, 5, "(", time_string, ",",
×
UNCOV
1987
                                        sampleDataBuf + pos * stbInfo->lenOfCols, ") ");
×
1988
                        }
1989
                        // check buffer enough
UNCOV
1990
                        if (ds_len(pThreadInfo->buffer)
×
1991
                                > stbInfo->max_sql_len) {
×
1992
                            errorPrint("sql buffer length (%"PRIu64") "
×
1993
                                    "is larger than max sql length "
1994
                                    "(%"PRId64")\n",
1995
                                    ds_len(pThreadInfo->buffer),
1996
                                    stbInfo->max_sql_len);
1997
                            goto free_of_interlace;
×
1998
                        }
1999

2000
                        // move next
2001
                        generated++;
×
2002
                        pos++;
×
2003
                        if (pos >= g_arguments->prepared_rand) {
×
2004
                            pos = 0;
×
2005
                        }
UNCOV
2006
                        if(stbInfo->primary_key)
×
UNCOV
2007
                            debugPrint("add child=%s %"PRId64" pk cur=%d cnt=%d \n", childTbl->name, childTbl->ts, childTbl->pkCur, childTbl->pkCnt);
×
2008

2009
                        // primary key
UNCOV
2010
                        if (!stbInfo->primary_key || needChangeTs(stbInfo, &childTbl->pkCur, &childTbl->pkCnt)) {
×
2011
                            childTbl->ts += stbInfo->timestamp_step;
×
2012
                            if(stbInfo->primary_key)
×
2013
                                debugPrint("changedTs child=%s %"PRId64" pk cur=%d cnt=%d \n", childTbl->name, childTbl->ts, childTbl->pkCur, childTbl->pkCnt);
×
2014
                        }
2015
                        
2016
                    }
UNCOV
2017
                    break;
×
2018
                }
UNCOV
2019
                case STMT_IFACE: {
×
2020
                    char escapedTbName[TSDB_TABLE_NAME_LEN+2] = "\0";
×
2021
                    if (g_arguments->escape_character) {
×
2022
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN+2,
×
2023
                                "`%s`", tableName);
2024
                    } else {
2025
                        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
×
2026
                                tableName);
2027
                    }
2028

2029
                    // generator
2030
                    if (stbInfo->autoTblCreating && w == 0) {
×
UNCOV
2031
                        if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
UNCOV
2032
                            goto free_of_interlace;
×
2033
                        }
2034
                    }
2035
                    
2036
                    // old must call prepareStmt for each table
UNCOV
2037
                    if (oldInitStmt) {
×
UNCOV
2038
                        debugPrint("call prepareStmt for stable:%s\n", stbInfo->stbName);
×
2039
                        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
×
2040
                            g_fail = true;
×
2041
                            goto free_of_interlace;
×
2042
                        }
2043
                    }
2044
      
2045
                    int64_t start = toolsGetTimestampUs();
×
UNCOV
2046
                    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
×
2047
                                             escapedTbName)) {
2048
                        errorPrint(
×
2049
                            "taos_stmt_set_tbname(%s) failed, reason: %s\n",
2050
                            tableName,
2051
                                taos_stmt_errstr(pThreadInfo->conn->stmt));
UNCOV
2052
                        g_fail = true;
×
UNCOV
2053
                        goto free_of_interlace;
×
2054
                    }
2055
                    delay1 += toolsGetTimestampUs() - start;
×
2056

2057
                    int32_t n = 0;
×
2058
                    generated += bindParamBatch(pThreadInfo, interlaceRows,
×
2059
                                       childTbl->ts, pos, childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n, &delay2, &delay3);
2060
                    
2061
                    // move next
UNCOV
2062
                    pos += interlaceRows;
×
2063
                    if (pos + interlaceRows + 1 >= g_arguments->prepared_rand) {
×
UNCOV
2064
                        pos = 0;
×
2065
                    }
UNCOV
2066
                    childTbl->ts += stbInfo->timestamp_step * n;
×
2067

2068
                    // move next
2069
                    if (stbInfo->autoTblCreating) {
×
2070
                        w += 1;
×
UNCOV
2071
                        if (w >= TAG_BATCH_COUNT) {
×
2072
                            // reset for gen again
UNCOV
2073
                            w = 0;
×
UNCOV
2074
                            index += TAG_BATCH_COUNT;
×
2075
                        }
2076
                    }
2077

2078
                    break;
×
2079
                }
UNCOV
2080
                case STMT2_IFACE: {
×
2081
                    // tbnames
UNCOV
2082
                    bindv->tbnames[i] = childTbl->name;
×
2083

2084
                    // tags
UNCOV
2085
                    if (stbInfo->autoTblCreating && firstInsertTb) {
×
2086
                        // create
UNCOV
2087
                        if (w == 0) {
×
2088
                            // recreate sample tags
UNCOV
2089
                            if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, pThreadInfo->tagsStmt, index)) {
×
2090
                                goto free_of_interlace;
×
2091
                            }
2092
                        }
2093

UNCOV
2094
                        if (csvFile) {
×
2095
                            if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
×
2096
                                g_fail = true;
×
UNCOV
2097
                                goto free_of_interlace;
×
2098
                            }
2099
                        }
2100
                    
2101
                        bindVTags(bindv, i, w, pThreadInfo->tagsStmt);
×
2102
                    }
2103

2104
                    // cols
UNCOV
2105
                    int32_t n = 0;
×
UNCOV
2106
                    generated += bindVColsInterlace(bindv, i, pThreadInfo, interlaceRows, childTbl->ts, pos, 
×
2107
                                                    childTbl, &childTbl->pkCur, &childTbl->pkCnt, &n);                    
2108
                    // move next
2109
                    pos += interlaceRows;
×
UNCOV
2110
                    if (pos + interlaceRows + 1 >= g_arguments->prepared_rand) {
×
2111
                        pos = 0;
×
2112
                    }
UNCOV
2113
                    childTbl->ts += stbInfo->timestamp_step * n;
×
UNCOV
2114
                    if (stbInfo->autoTblCreating) {
×
UNCOV
2115
                        w += 1;
×
2116
                        if (w >= TAG_BATCH_COUNT) {
×
2117
                            // reset for gen again
2118
                            w = 0;
×
UNCOV
2119
                            index += TAG_BATCH_COUNT;
×
2120
                        }
2121
                    }
2122

2123
                    break;
×
2124
                }
2125
                case SML_REST_IFACE:
×
2126
                case SML_IFACE: {
2127
                    int protocol = stbInfo->lineProtocol;
×
2128
                    for (int64_t j = 0; j < interlaceRows; j++) {
×
UNCOV
2129
                        int64_t disorderTs = getDisorderTs(stbInfo,
×
2130
                                &disorderRange);
UNCOV
2131
                        if (TSDB_SML_JSON_PROTOCOL == protocol) {
×
2132
                            tools_cJSON *tag = tools_cJSON_Duplicate(
×
2133
                                tools_cJSON_GetArrayItem(
×
2134
                                    pThreadInfo->sml_json_tags,
×
2135
                                    (int)tableSeq -
×
UNCOV
2136
                                        pThreadInfo->start_table_from),
×
2137
                                    true);
UNCOV
2138
                            generateSmlJsonCols(
×
2139
                                pThreadInfo->json_array, tag, stbInfo,
UNCOV
2140
                                database->sml_precision,
×
2141
                                    disorderTs?disorderTs:childTbl->ts);
UNCOV
2142
                        } else if (SML_JSON_TAOS_FORMAT == protocol) {
×
2143
                            tools_cJSON *tag = tools_cJSON_Duplicate(
×
2144
                                tools_cJSON_GetArrayItem(
×
UNCOV
2145
                                    pThreadInfo->sml_json_tags,
×
UNCOV
2146
                                    (int)tableSeq -
×
2147
                                        pThreadInfo->start_table_from),
×
2148
                                    true);
2149
                            generateSmlTaosJsonCols(
×
2150
                                pThreadInfo->json_array, tag, stbInfo,
2151
                                database->sml_precision,
×
2152
                                disorderTs?disorderTs:childTbl->ts);
2153
                        } else if (TSDB_SML_LINE_PROTOCOL == protocol) {
×
2154
                            snprintf(
×
UNCOV
2155
                                pThreadInfo->lines[generated],
×
2156
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
×
2157
                                "%s %s %" PRId64,
2158
                                pThreadInfo
UNCOV
2159
                                    ->sml_tags[(int)tableSeq -
×
UNCOV
2160
                                               pThreadInfo->start_table_from],
×
2161
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
×
2162
                                disorderTs?disorderTs:childTbl->ts);
2163
                        } else {
UNCOV
2164
                            snprintf(
×
2165
                                pThreadInfo->lines[generated],
×
2166
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
×
2167
                                "%s %" PRId64 " %s %s", stbInfo->stbName,
2168
                                disorderTs?disorderTs:childTbl->ts,
2169
                                    sampleDataBuf + pos * stbInfo->lenOfCols,
×
2170
                                pThreadInfo
2171
                                    ->sml_tags[(int)tableSeq -
×
2172
                                               pThreadInfo->start_table_from]);
×
2173
                        }
2174
                        generated++;
×
2175
                        // primary key
2176
                        if (!stbInfo->primary_key || needChangeTs(stbInfo, &childTbl->pkCur, &childTbl->pkCnt)) {
×
UNCOV
2177
                            childTbl->ts += stbInfo->timestamp_step;
×
2178
                        }
2179
                    }
2180
                    if (TSDB_SML_JSON_PROTOCOL == protocol
×
2181
                            || SML_JSON_TAOS_FORMAT == protocol) {
×
2182
                        pThreadInfo->lines[0] =
×
2183
                            tools_cJSON_PrintUnformatted(
×
2184
                                pThreadInfo->json_array);
×
2185
                    }
UNCOV
2186
                    break;
×
2187
                }
2188
            }
2189

2190
            // move to next table in one batch
2191
            tableSeq++;
×
2192
            tmp_total_insert_rows += interlaceRows;
×
2193
            if (tableSeq > pThreadInfo->end_table_to) {
×
2194
                // first insert tables loop is end
UNCOV
2195
                firstInsertTb = false;
×
2196
                // one tables loop timestamp and pos add 
2197
                tableSeq = pThreadInfo->start_table_from;
×
2198
                // save    
2199
                pThreadInfo->pos = pos;    
×
UNCOV
2200
                if (!stbInfo->non_stop) {
×
UNCOV
2201
                    insertRows -= interlaceRows;
×
2202
                }
2203

2204
                // if fillBack mode , can't sleep
UNCOV
2205
                if (stbInfo->insert_interval > 0 && !fillBack) {
×
UNCOV
2206
                    debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
×
2207
                          __func__, __LINE__, stbInfo->insert_interval);
UNCOV
2208
                    perfPrint("sleep %" PRIu64 " ms\n",
×
2209
                                     stbInfo->insert_interval);
2210
                    toolsMsleep((int32_t)stbInfo->insert_interval);
×
2211
                }
2212

UNCOV
2213
                i++;
×
2214
                // rectify bind count
2215
                if (bindv && bindv->count != i) {
×
UNCOV
2216
                    bindv->count = i;
×
2217
                }                
2218
                break;
×
2219
            }
2220
        }
2221

2222
        // exec
UNCOV
2223
        if(stbInfo->iface == STMT2_IFACE) {
×
2224
            // exec stmt2
UNCOV
2225
            if(g_arguments->debug_print)
×
UNCOV
2226
                showBindV(bindv, stbInfo->tags, stbInfo->cols);
×
2227
            // bind & exec stmt2
UNCOV
2228
            if (submitStmt2(pThreadInfo, bindv, &delay1, &delay3, &startTs, &endTs, &generated, w) != 0) {
×
2229
                g_fail = true;
×
2230
                goto free_of_interlace;
×
2231
            }
2232
        } else {
2233
            // exec other
UNCOV
2234
            startTs = toolsGetTimestampUs();
×
2235
            if (execInsert(pThreadInfo, generated, &delay3)) {
×
UNCOV
2236
                g_fail = true;
×
2237
                goto free_of_interlace;
×
2238
            }
2239
            endTs = toolsGetTimestampUs();
×
2240
        }
2241

UNCOV
2242
        debugPrint("execInsert tableIndex=%d left insert rows=%"PRId64" generated=%d\n", i, insertRows, generated);
×
2243
                
2244
        // reset count
UNCOV
2245
        if(bindv) {
×
2246
            bindv->count = 0;
×
2247
        }            
2248

UNCOV
2249
        pThreadInfo->totalInsertRows += tmp_total_insert_rows;
×
2250

2251
        if (g_arguments->terminate) {
×
UNCOV
2252
            goto free_of_interlace;
×
2253
        }
2254

UNCOV
2255
        int protocol = stbInfo->lineProtocol;
×
2256
        switch (stbInfo->iface) {
×
UNCOV
2257
            case TAOSC_IFACE:
×
2258
            case REST_IFACE:
UNCOV
2259
                debugPrint("pThreadInfo->buffer: %s\n",
×
2260
                           pThreadInfo->buffer);
2261
                free_ds(&pThreadInfo->buffer);
×
UNCOV
2262
                pThreadInfo->buffer = new_ds(0);
×
2263
                break;
×
2264
            case SML_REST_IFACE:
×
UNCOV
2265
                memset(pThreadInfo->buffer, 0,
×
2266
                       g_arguments->reqPerReq * (pThreadInfo->max_sql_len + 1));
×
2267
            case SML_IFACE:
×
2268
                if (TSDB_SML_JSON_PROTOCOL == protocol
×
UNCOV
2269
                        || SML_JSON_TAOS_FORMAT == protocol) {
×
UNCOV
2270
                    debugPrint("pThreadInfo->lines[0]: %s\n",
×
2271
                               pThreadInfo->lines[0]);
2272
                    if (pThreadInfo->json_array && !g_arguments->terminate) {
×
2273
                        tools_cJSON_Delete(pThreadInfo->json_array);
×
2274
                        pThreadInfo->json_array = NULL;
×
2275
                    }
UNCOV
2276
                    pThreadInfo->json_array = tools_cJSON_CreateArray();
×
2277
                    if (pThreadInfo->lines && pThreadInfo->lines[0]) {
×
UNCOV
2278
                        tmfree(pThreadInfo->lines[0]);
×
UNCOV
2279
                        pThreadInfo->lines[0] = NULL;
×
2280
                    }
2281
                } else {
UNCOV
2282
                    for (int j = 0; j < generated; j++) {
×
2283
                        if (pThreadInfo && pThreadInfo->lines
×
2284
                                && !g_arguments->terminate) {
×
UNCOV
2285
                            debugPrint("pThreadInfo->lines[%d]: %s\n", j,
×
2286
                                       pThreadInfo->lines[j]);
2287
                            memset(pThreadInfo->lines[j], 0,
×
2288
                                   pThreadInfo->max_sql_len);
2289
                        }
2290
                    }
2291
                }
UNCOV
2292
                break;
×
2293
            case STMT_IFACE:
×
2294
                break;
×
2295
        }
2296

2297
        int64_t delay4 = endTs - startTs;
×
UNCOV
2298
        int64_t delay = delay1 + delay2 + delay3 + delay4;
×
2299
        if (delay <=0) {
×
2300
            debugPrint("thread[%d]: startTS: %"PRId64", endTS: %"PRId64"\n",
×
2301
                       pThreadInfo->threadID, startTs, endTs);
2302
        } else {
2303
            perfPrint("insert execution time is %10.2f ms\n",
×
2304
                      delay / 1E6);
2305

2306
            int64_t * pdelay = benchCalloc(1, sizeof(int64_t), false);
×
2307
            *pdelay = delay;
×
2308
            if (benchArrayPush(pThreadInfo->delayList, pdelay) == NULL) {
×
UNCOV
2309
                tmfree(pdelay);
×
2310
            }
2311
            pThreadInfo->totalDelay += delay;
×
2312
            pThreadInfo->totalDelay1 += delay1;
×
UNCOV
2313
            pThreadInfo->totalDelay2 += delay2;
×
2314
            pThreadInfo->totalDelay3 += delay3;
×
2315
        }
2316
        delay1 = delay2 = delay3 = 0;
×
2317

UNCOV
2318
        int64_t currentPrintTime = toolsGetTimestampMs();
×
UNCOV
2319
        if (currentPrintTime - lastPrintTime > 30 * 1000) {
×
2320
            infoPrint(
×
2321
                    "thread[%d] has currently inserted rows: %" PRIu64
2322
                    ", peroid insert rate: %.3f rows/s \n",
2323
                    pThreadInfo->threadID, pThreadInfo->totalInsertRows,
2324
                    (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
2325
            lastPrintTime = currentPrintTime;
×
UNCOV
2326
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
×
2327
        }
2328
    }
2329

2330
free_of_interlace:
×
2331
    cleanupAndPrint(pThreadInfo, "interlace");
×
2332
    if(csvFile) {
×
UNCOV
2333
        fclose(csvFile);
×
2334
    }
2335
    tmfree(tagData);
×
2336
    freeBindV(bindv);
×
2337
    return NULL;
×
2338
}
2339

UNCOV
2340
static int32_t prepareProgressDataStmt(
×
2341
        threadInfo *pThreadInfo,
2342
        SChildTable *childTbl,
2343
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1, int64_t *delay2, int64_t *delay3) {
2344
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
2345
    char escapedTbName[TSDB_TABLE_NAME_LEN + 2] = "\0";
×
2346
    if (g_arguments->escape_character) {
×
2347
        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN + 2,
×
2348
                 "`%s`", childTbl->name);
2349
    } else {
2350
        snprintf(escapedTbName, TSDB_TABLE_NAME_LEN, "%s",
×
2351
                 childTbl->name);
2352
    }
UNCOV
2353
    int64_t start = toolsGetTimestampUs();
×
2354
    if (taos_stmt_set_tbname(pThreadInfo->conn->stmt,
×
2355
                             escapedTbName)) {
2356
        errorPrint(
×
2357
                "taos_stmt_set_tbname(%s) failed,"
2358
                "reason: %s\n", escapedTbName,
2359
                taos_stmt_errstr(pThreadInfo->conn->stmt));
UNCOV
2360
        return -1;
×
2361
    }
UNCOV
2362
    *delay1 = toolsGetTimestampUs() - start;
×
2363
    int32_t n = 0;
×
2364
    int64_t pos = i % g_arguments->prepared_rand;
×
UNCOV
2365
    if (g_arguments->prepared_rand - pos < g_arguments->reqPerReq) {
×
2366
        // remain prepare data less than batch, reset pos to zero
UNCOV
2367
        pos = 0;
×
2368
    }
2369
    int32_t generated = bindParamBatch(
×
2370
            pThreadInfo,
2371
            (g_arguments->reqPerReq > (stbInfo->insertRows - i))
×
2372
                ? (stbInfo->insertRows - i)
2373
                : g_arguments->reqPerReq,
×
2374
            *timestamp, pos, childTbl, pkCur, pkCnt, &n, delay2, delay3);
2375
    *timestamp += n * stbInfo->timestamp_step;
×
UNCOV
2376
    return generated;
×
2377
}
2378

UNCOV
2379
static void makeTimestampDisorder(
×
2380
        int64_t *timestamp, SSuperTable *stbInfo) {
UNCOV
2381
    int64_t startTimestamp = stbInfo->startTimestamp;
×
2382
    int disorderRange = stbInfo->disorderRange;
×
2383
    int rand_num = taosRandom() % 100;
×
2384
    if (rand_num < stbInfo->disorderRatio) {
×
2385
        disorderRange--;
×
UNCOV
2386
        if (0 == disorderRange) {
×
UNCOV
2387
            disorderRange = stbInfo->disorderRange;
×
2388
        }
UNCOV
2389
        *timestamp = startTimestamp - disorderRange;
×
UNCOV
2390
        debugPrint("rand_num: %d, < disorderRatio: %d"
×
2391
                   ", ts: %"PRId64"\n",
2392
                   rand_num,
2393
                   stbInfo->disorderRatio,
2394
                   *timestamp);
2395
    }
UNCOV
2396
}
×
2397

2398
static int32_t prepareProgressDataSmlJsonText(
×
2399
    threadInfo *pThreadInfo,
2400
    uint64_t tableSeq,
2401
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2402
    // prepareProgressDataSmlJsonText
2403
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
UNCOV
2404
    int32_t generated = 0;
×
2405

UNCOV
2406
    int len = 0;
×
2407

UNCOV
2408
    char *line = pThreadInfo->lines[0];
×
2409
    uint32_t line_buf_len = pThreadInfo->line_buf_len;
×
2410

2411
    strncat(line + len, "[", 2);
×
UNCOV
2412
    len += 1;
×
2413

2414
    int32_t pos = 0;
×
UNCOV
2415
    for (int j = 0; (j < g_arguments->reqPerReq)
×
UNCOV
2416
            && !g_arguments->terminate; j++) {
×
2417
        strncat(line + len, "{", 2);
×
UNCOV
2418
        len += 1;
×
2419
        int n;
2420
        n = snprintf(line + len, line_buf_len - len,
×
2421
                 "\"timestamp\":%"PRId64",", *timestamp);
2422
        if (n < 0 || n >= line_buf_len - len) {
×
2423
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2424
                       __func__, __LINE__, j);
2425
            return -1;
×
2426
        } else {
2427
            len += n;
×
2428
        }
2429

UNCOV
2430
        n = snprintf(line + len, line_buf_len - len, "%s",
×
UNCOV
2431
                        pThreadInfo->sml_json_value_array[tableSeq]);
×
UNCOV
2432
        if (n < 0 || n >= line_buf_len - len) {
×
UNCOV
2433
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2434
                       __func__, __LINE__, j);
UNCOV
2435
            return -1;
×
2436
        } else {
UNCOV
2437
            len += n;
×
2438
        }
UNCOV
2439
        n = snprintf(line + len, line_buf_len - len, "\"tags\":%s,",
×
UNCOV
2440
                       pThreadInfo->sml_tags_json_array[tableSeq]);
×
2441
        if (n < 0 || n >= line_buf_len - len) {
×
2442
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2443
                       __func__, __LINE__, j);
2444
            return -1;
×
2445
        } else {
2446
            len += n;
×
2447
        }
UNCOV
2448
        n = snprintf(line + len, line_buf_len - len,
×
2449
                       "\"metric\":\"%s\"}", stbInfo->stbName);
2450
        if (n < 0 || n >= line_buf_len - len) {
×
UNCOV
2451
            errorPrint("%s() LN%d snprintf overflow on %d\n",
×
2452
                       __func__, __LINE__, j);
2453
            return -1;
×
2454
        } else {
2455
            len += n;
×
2456
        }
2457

2458
        pos++;
×
UNCOV
2459
        if (pos >= g_arguments->prepared_rand) {
×
2460
            pos = 0;
×
2461
        }
2462

2463
        // primay key repeat ts count
UNCOV
2464
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
×
2465
            *timestamp += stbInfo->timestamp_step;
×
2466
        }
2467

2468
        if (stbInfo->disorderRatio > 0) {
×
2469
            makeTimestampDisorder(timestamp, stbInfo);
×
2470
        }
2471
        generated++;
×
UNCOV
2472
        if (i + generated >= stbInfo->insertRows) {
×
2473
            break;
×
2474
        }
2475
        if ((j+1) < g_arguments->reqPerReq) {
×
UNCOV
2476
            strncat(line + len, ",", 2);
×
2477
            len += 1;
×
2478
        }
2479
    }
2480
    strncat(line + len, "]", 2);
×
2481

2482
    debugPrint("%s() LN%d, lines[0]: %s\n",
×
2483
               __func__, __LINE__, pThreadInfo->lines[0]);
2484
    return generated;
×
2485
}
2486

UNCOV
2487
static int32_t prepareProgressDataSmlJson(
×
2488
    threadInfo *pThreadInfo,
2489
    uint64_t tableSeq,
2490
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2491
    // prepareProgressDataSmlJson
UNCOV
2492
    SDataBase *  database = pThreadInfo->dbInfo;
×
2493
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
UNCOV
2494
    int32_t generated = 0;
×
2495

2496
    int32_t pos = 0;
×
2497
    for (int j = 0; (j < g_arguments->reqPerReq)
×
2498
            && !g_arguments->terminate; j++) {
×
UNCOV
2499
        tools_cJSON *tag = tools_cJSON_Duplicate(
×
UNCOV
2500
                tools_cJSON_GetArrayItem(
×
UNCOV
2501
                    pThreadInfo->sml_json_tags,
×
2502
                    (int)tableSeq -
×
2503
                    pThreadInfo->start_table_from),
×
2504
                true);
UNCOV
2505
        debugPrintJsonNoTime(tag);
×
2506
        generateSmlTaosJsonCols(
×
2507
                pThreadInfo->json_array, tag, stbInfo,
UNCOV
2508
                database->sml_precision, *timestamp);
×
2509
        pos++;
×
2510
        if (pos >= g_arguments->prepared_rand) {
×
2511
            pos = 0;
×
2512
        }
2513

2514
        // primay key repeat ts count
2515
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
×
UNCOV
2516
            *timestamp += stbInfo->timestamp_step;
×
2517
        }
2518

UNCOV
2519
        if (stbInfo->disorderRatio > 0) {
×
2520
            makeTimestampDisorder(timestamp, stbInfo);
×
2521
        }
2522
        generated++;
×
UNCOV
2523
        if (i + generated >= stbInfo->insertRows) {
×
UNCOV
2524
            break;
×
2525
        }
2526
    }
2527

UNCOV
2528
    tmfree(pThreadInfo->lines[0]);
×
UNCOV
2529
    pThreadInfo->lines[0] = NULL;
×
2530
    pThreadInfo->lines[0] =
×
2531
            tools_cJSON_PrintUnformatted(
×
2532
                pThreadInfo->json_array);
×
UNCOV
2533
    debugPrint("pThreadInfo->lines[0]: %s\n",
×
2534
                   pThreadInfo->lines[0]);
2535

2536
    return generated;
×
2537
}
2538

2539
static int32_t prepareProgressDataSmlLineOrTelnet(
×
2540
    threadInfo *pThreadInfo, uint64_t tableSeq, char *sampleDataBuf,
2541
    int64_t *timestamp, uint64_t i, char *ttl, int protocol, int32_t *pkCur, int32_t *pkCnt) {
2542
    // prepareProgressDataSmlLine
2543
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
2544
    int32_t generated = 0;
×
2545

2546
    int32_t pos = 0;
×
2547
    for (int j = 0; (j < g_arguments->reqPerReq)
×
2548
            && !g_arguments->terminate; j++) {
×
2549
        // table index
UNCOV
2550
        int ti = tableSeq - pThreadInfo->start_table_from;
×
UNCOV
2551
        if (TSDB_SML_LINE_PROTOCOL == protocol) {
×
UNCOV
2552
            snprintf(
×
2553
                    pThreadInfo->lines[j],
×
2554
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
×
2555
                    "%s %s %" PRId64,
UNCOV
2556
                    pThreadInfo->sml_tags[ti],
×
2557
                    sampleDataBuf + pos * stbInfo->lenOfCols,
×
2558
                    *timestamp);
2559
        } else {
2560
            snprintf(
×
2561
                    pThreadInfo->lines[j],
×
2562
                    stbInfo->lenOfCols + stbInfo->lenOfTags,
×
2563
                    "%s %" PRId64 " %s %s", stbInfo->stbName,
2564
                    *timestamp,
2565
                    sampleDataBuf
2566
                    + pos * stbInfo->lenOfCols,
×
2567
                    pThreadInfo->sml_tags[ti]);
×
2568
        }
2569
        //infoPrint("sml prepare j=%d stb=%s sml_tags=%s \n", j, stbInfo->stbName, pThreadInfo->sml_tags[ti]);
2570
        pos++;
×
2571
        if (pos >= g_arguments->prepared_rand) {
×
UNCOV
2572
            pos = 0;
×
2573
        }
2574
        // primay key repeat ts count
UNCOV
2575
        if (!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
×
UNCOV
2576
            *timestamp += stbInfo->timestamp_step;
×
2577
        }
2578
        
UNCOV
2579
        if (stbInfo->disorderRatio > 0) {
×
UNCOV
2580
            makeTimestampDisorder(timestamp, stbInfo);
×
2581
        }
2582
        generated++;
×
UNCOV
2583
        if (i + generated >= stbInfo->insertRows) {
×
2584
            break;
×
2585
        }
2586
    }
UNCOV
2587
    return generated;
×
2588
}
2589

2590
static int32_t prepareProgressDataSml(
×
2591
    threadInfo *pThreadInfo,
2592
    SChildTable *childTbl,
2593
    uint64_t tableSeq,
2594
    int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt) {
2595
    // prepareProgressDataSml
UNCOV
2596
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
2597

2598
    char *sampleDataBuf;
2599
    if (childTbl->useOwnSample) {
×
2600
        sampleDataBuf = childTbl->sampleDataBuf;
×
2601
    } else {
UNCOV
2602
        sampleDataBuf = stbInfo->sampleDataBuf;
×
2603
    }
2604
    int protocol = stbInfo->lineProtocol;
×
2605
    int32_t generated = -1;
×
UNCOV
2606
    switch (protocol) {
×
UNCOV
2607
        case TSDB_SML_LINE_PROTOCOL:
×
2608
        case TSDB_SML_TELNET_PROTOCOL:
2609
            generated = prepareProgressDataSmlLineOrTelnet(
×
2610
                    pThreadInfo,
2611
                    tableSeq,
2612
                    sampleDataBuf,
2613
                    timestamp, i, ttl, protocol, pkCur, pkCnt);
2614
            break;
×
UNCOV
2615
        case TSDB_SML_JSON_PROTOCOL:
×
UNCOV
2616
            generated = prepareProgressDataSmlJsonText(
×
2617
                    pThreadInfo,
2618
                    tableSeq - pThreadInfo->start_table_from,
×
2619
                timestamp, i, ttl, pkCur, pkCnt);
2620
            break;
×
2621
        case SML_JSON_TAOS_FORMAT:
×
2622
            generated = prepareProgressDataSmlJson(
×
2623
                    pThreadInfo,
2624
                    tableSeq,
2625
                    timestamp, i, ttl, pkCur, pkCnt);
UNCOV
2626
            break;
×
UNCOV
2627
        default:
×
2628
            errorPrint("%s() LN%d: unknown protcolor: %d\n",
×
2629
                       __func__, __LINE__, protocol);
UNCOV
2630
            break;
×
2631
    }
2632

UNCOV
2633
    return generated;
×
2634
}
2635

2636
// if return true, timestmap must add timestap_step, else timestamp no need changed
2637
bool needChangeTs(SSuperTable * stbInfo, int32_t *pkCur, int32_t *pkCnt) {
×
2638
    // check need generate cnt
UNCOV
2639
    if(*pkCnt == 0) {
×
2640
        if (stbInfo->repeat_ts_min >= stbInfo->repeat_ts_max) {
×
2641
            // fixed count value is max
2642
            if (stbInfo->repeat_ts_max == 0){
×
2643
                return true;
×
2644
            }
2645

UNCOV
2646
            *pkCnt = stbInfo->repeat_ts_max;
×
2647
        } else {
2648
            // random range
UNCOV
2649
            *pkCnt = RD(stbInfo->repeat_ts_max + 1);
×
UNCOV
2650
            if(*pkCnt < stbInfo->repeat_ts_min) {
×
UNCOV
2651
                *pkCnt = (*pkCnt + stbInfo->repeat_ts_min) % stbInfo->repeat_ts_max;
×
2652
            }
2653
        }
2654
    }
2655

2656
    // compare with current value
UNCOV
2657
    *pkCur = *pkCur + 1;
×
2658
    if(*pkCur >= *pkCnt) {
×
2659
        // reset zero
2660
        *pkCur = 0;
×
UNCOV
2661
        *pkCnt = 0;
×
UNCOV
2662
        return true;
×
2663
    } else {
2664
        // add one
2665
        return false;
×
2666
    }
2667
}
2668

UNCOV
2669
static int32_t prepareProgressDataSql(
×
2670
                    threadInfo *pThreadInfo,
2671
                    SChildTable *childTbl, 
2672
                    char* tagData,
2673
                    uint64_t tableSeq,
2674
                    char *sampleDataBuf,
2675
                    int64_t *timestamp, uint64_t i, char *ttl,
2676
                    int32_t *pos, uint64_t *len, int32_t* pkCur, int32_t* pkCnt) {
2677
    // prepareProgressDataSql
2678
    int32_t generated = 0;
×
UNCOV
2679
    SDataBase *database = pThreadInfo->dbInfo;
×
2680
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
2681
    char *  pstr = pThreadInfo->buffer;
×
UNCOV
2682
    int disorderRange = stbInfo->disorderRange;
×
2683

2684
    if (stbInfo->partialColNum == stbInfo->cols->size) {
×
UNCOV
2685
        if (stbInfo->autoTblCreating) {
×
UNCOV
2686
            *len =
×
2687
                snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
×
2688
                        g_arguments->escape_character
×
2689
                        ? "%s `%s`.`%s` USING `%s`.`%s` TAGS (%s) %s VALUES "
2690
                        : "%s %s.%s USING %s.%s TAGS (%s) %s VALUES ",
2691
                         STR_INSERT_INTO, database->dbName,
2692
                         childTbl->name, database->dbName,
2693
                         stbInfo->stbName,
2694
                         tagData +
2695
                         stbInfo->lenOfTags * tableSeq, ttl);
×
2696
        } else {
UNCOV
2697
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
×
2698
                    g_arguments->escape_character
×
2699
                           ? "%s `%s`.`%s` VALUES "
2700
                           : "%s %s.%s VALUES ",
2701
                           STR_INSERT_INTO,
2702
                           database->dbName, childTbl->name);
2703
        }
2704
    } else {
UNCOV
2705
        if (stbInfo->autoTblCreating) {
×
UNCOV
2706
            *len = snprintf(
×
2707
                    pstr, TSDB_MAX_ALLOWED_SQL_LEN,
UNCOV
2708
                    g_arguments->escape_character
×
2709
                    ? "%s `%s`.`%s` (%s) USING `%s`.`%s` TAGS (%s) %s VALUES "
2710
                    : "%s %s.%s (%s) USING %s.%s TAGS (%s) %s VALUES ",
2711
                    STR_INSERT_INTO, database->dbName,
2712
                    childTbl->name,
2713
                    stbInfo->partialColNameBuf,
2714
                    database->dbName, stbInfo->stbName,
2715
                    tagData +
2716
                    stbInfo->lenOfTags * tableSeq, ttl);
×
2717
        } else {
2718
            *len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN,
×
2719
                    g_arguments->escape_character
×
2720
                    ? "%s `%s`.`%s` (%s) VALUES "
2721
                    : "%s %s.%s (%s) VALUES ",
2722
                    STR_INSERT_INTO, database->dbName,
2723
                    childTbl->name,
2724
                    stbInfo->partialColNameBuf);
2725
        }
2726
    }
2727

2728
    char *ownSampleDataBuf;
UNCOV
2729
    if (childTbl->useOwnSample) {
×
UNCOV
2730
        debugPrint("%s is using own sample data\n",
×
2731
                  childTbl->name);
UNCOV
2732
        ownSampleDataBuf = childTbl->sampleDataBuf;
×
2733
    } else {
UNCOV
2734
        ownSampleDataBuf = stbInfo->sampleDataBuf;
×
2735
    }
2736
    for (int j = 0; j < g_arguments->reqPerReq; j++) {
×
UNCOV
2737
        if (stbInfo->useSampleTs
×
UNCOV
2738
                && (!stbInfo->random_data_source)) {
×
UNCOV
2739
            *len +=
×
UNCOV
2740
                snprintf(pstr + *len,
×
UNCOV
2741
                         TSDB_MAX_ALLOWED_SQL_LEN - *len, "(%s)",
×
2742
                         sampleDataBuf +
2743
                         *pos * stbInfo->lenOfCols);
×
2744
        } else {
UNCOV
2745
            int64_t disorderTs = getDisorderTs(stbInfo, &disorderRange);
×
2746
            *len += snprintf(pstr + *len,
×
UNCOV
2747
                            TSDB_MAX_ALLOWED_SQL_LEN - *len,
×
2748
                            "(%" PRId64 ",%s)",
2749
                            disorderTs?disorderTs:*timestamp,
2750
                            ownSampleDataBuf +
UNCOV
2751
                            *pos * stbInfo->lenOfCols);
×
2752
        }
UNCOV
2753
        *pos += 1;
×
2754
        if (*pos >= g_arguments->prepared_rand) {
×
UNCOV
2755
            *pos = 0;
×
2756
        }
2757
        // primary key
UNCOV
2758
        if(!stbInfo->primary_key || needChangeTs(stbInfo, pkCur, pkCnt)) {
×
UNCOV
2759
            *timestamp += stbInfo->timestamp_step;
×
2760
        }
2761
   
UNCOV
2762
        generated++;
×
UNCOV
2763
        if (*len > (TSDB_MAX_ALLOWED_SQL_LEN
×
UNCOV
2764
            - stbInfo->lenOfCols)) {
×
UNCOV
2765
            break;
×
2766
        }
2767
        if (i + generated >= stbInfo->insertRows) {
×
2768
            break;
×
2769
        }
2770
    }
2771

2772
    return generated;
×
2773
}
2774

2775
void *syncWriteProgressive(void *sarg) {
×
2776
    threadInfo * pThreadInfo = (threadInfo *)sarg;
×
2777
    SDataBase *  database = pThreadInfo->dbInfo;
×
2778
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
2779

UNCOV
2780
    loadChildTableInfo(pThreadInfo);
×
2781

2782
    // special deal flow for TAOSC_IFACE
2783
    if (insertDataMix(pThreadInfo, database, stbInfo)) {
×
2784
        // request be dealt by this function , so return
2785
        return NULL;
×
2786
    }
2787

UNCOV
2788
    infoPrint(
×
2789
        "thread[%d] start progressive inserting into table from "
2790
        "%" PRIu64 " to %" PRIu64 "\n",
2791
        pThreadInfo->threadID, pThreadInfo->start_table_from,
2792
        pThreadInfo->end_table_to + 1);
2793

UNCOV
2794
    uint64_t  lastPrintTime = toolsGetTimestampMs();
×
UNCOV
2795
    uint64_t  lastTotalInsertRows = 0;
×
2796
    int64_t   startTs = toolsGetTimestampUs();
×
2797
    int64_t   endTs;
2798

UNCOV
2799
    FILE* csvFile = NULL;
×
2800
    char* tagData = NULL;
×
2801
    bool  stmt    = (stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating;
×
2802
    bool  smart   = SMART_IF_FAILED == stbInfo->continueIfFail;
×
2803
    bool  acreate = (stbInfo->iface == TAOSC_IFACE || stbInfo->iface == REST_IFACE) && stbInfo->autoTblCreating;
×
UNCOV
2804
    int   w       = 0;
×
2805
    if (stmt || smart || acreate) {
×
2806
        csvFile = openTagCsv(stbInfo, pThreadInfo->start_table_from);
×
UNCOV
2807
        tagData = benchCalloc(TAG_BATCH_COUNT, stbInfo->lenOfTags, false);
×
2808
    }
2809

2810
    bool oldInitStmt = stbInfo->autoTblCreating;
×
2811
    // stmt.  not auto table create call on stmt
UNCOV
2812
    if (stbInfo->iface == STMT_IFACE && !oldInitStmt) {
×
2813
        if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
×
2814
            g_fail = true;
×
2815
            goto free_of_progressive;
×
2816
        }
2817
    }
2818
    else if (stbInfo->iface == STMT2_IFACE && !stbInfo->autoTblCreating) {
×
UNCOV
2819
        if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
×
UNCOV
2820
            g_fail = true;
×
2821
            goto free_of_progressive;
×
2822
        }
2823
    }
2824
    
2825
    //
2826
    // loop write each child table
2827
    //
UNCOV
2828
    int16_t index = pThreadInfo->start_table_from;
×
UNCOV
2829
    for (uint64_t tableSeq = pThreadInfo->start_table_from;
×
UNCOV
2830
            tableSeq <= pThreadInfo->end_table_to; tableSeq++) {
×
2831
        char *sampleDataBuf;
2832
        SChildTable *childTbl;
2833

2834
        if (g_arguments->bind_vgroup) {
×
UNCOV
2835
            childTbl = pThreadInfo->vg->childTblArray[tableSeq];
×
2836
        } else {
2837
            childTbl = stbInfo->childTblArray[tableSeq];
×
2838
        }
2839
        debugPrint("tableSeq=%"PRId64" childTbl->name=%s\n", tableSeq, childTbl->name);
×
2840

2841
        if (childTbl->useOwnSample) {
×
2842
            sampleDataBuf = childTbl->sampleDataBuf;
×
2843
        } else {
2844
            sampleDataBuf = stbInfo->sampleDataBuf;
×
2845
        }
2846

UNCOV
2847
        int64_t  timestamp = pThreadInfo->start_time;
×
2848
        uint64_t len = 0;
×
UNCOV
2849
        int32_t pos = 0;
×
2850
        int32_t pkCur = 0; // record generate same timestamp current count
×
2851
        int32_t pkCnt = 0; // record generate same timestamp count
×
2852
        int64_t delay1 = 0;
×
2853
        int64_t delay2 = 0;
×
UNCOV
2854
        int64_t delay3 = 0;
×
2855

2856
        if(stmt || smart || acreate) {
×
2857
            // generator
2858
            if (w == 0) {
×
2859
                if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
UNCOV
2860
                    g_fail = true;
×
UNCOV
2861
                    goto free_of_progressive;
×
2862
                }
2863
            }
2864
        }
2865

2866
        // old init stmt must call for each table
2867
        if (stbInfo->iface == STMT_IFACE && oldInitStmt) {
×
2868
            if (prepareStmt(pThreadInfo->conn->stmt, stbInfo, tagData, w, database->dbName)) {
×
UNCOV
2869
                g_fail = true;
×
UNCOV
2870
                goto free_of_progressive;
×
2871
            }
2872
        }
2873
        else if (stbInfo->iface == STMT2_IFACE && stbInfo->autoTblCreating) {
×
UNCOV
2874
            if (prepareStmt2(pThreadInfo->conn->stmt2, stbInfo, tagData, w, database->dbName)) {
×
2875
                g_fail = true;
×
UNCOV
2876
                goto free_of_progressive;
×
2877
            }
2878
        }
2879
        
2880
        if(stmt || smart || acreate) {
×
2881
            // move next
2882
            if (++w >= TAG_BATCH_COUNT) {
×
2883
                // reset for gen again
UNCOV
2884
                w = 0;
×
2885
                index += TAG_BATCH_COUNT;
×
2886
            } 
2887
        }
2888

2889
        char ttl[SMALL_BUFF_LEN] = "";
×
2890
        if (stbInfo->ttl != 0) {
×
2891
            snprintf(ttl, SMALL_BUFF_LEN, "TTL %d", stbInfo->ttl);
×
2892
        }
UNCOV
2893
        for (uint64_t i = 0; i < stbInfo->insertRows;) {
×
2894
            if (g_arguments->terminate) {
×
UNCOV
2895
                goto free_of_progressive;
×
2896
            }
2897
            int32_t generated = 0;
×
2898
            switch (stbInfo->iface) {
×
2899
                case TAOSC_IFACE:
×
2900
                case REST_IFACE:                
UNCOV
2901
                    generated = prepareProgressDataSql(
×
2902
                            pThreadInfo,
2903
                            childTbl,
2904
                            tagData,
2905
                            w,
2906
                            sampleDataBuf,
2907
                            &timestamp, i, ttl, &pos, &len, &pkCur, &pkCnt);
2908
                    break;        
×
UNCOV
2909
                case STMT_IFACE: {
×
UNCOV
2910
                    generated = prepareProgressDataStmt(
×
2911
                            pThreadInfo,
2912
                            childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1, &delay2, &delay3);
2913
                    break;
×
2914
                }
UNCOV
2915
                case STMT2_IFACE: {
×
UNCOV
2916
                    generated = stmt2BindAndSubmit(
×
2917
                            pThreadInfo,
2918
                            childTbl, &timestamp, i, ttl, &pkCur, &pkCnt, &delay1,
2919
                            &delay3, &startTs, &endTs, w);
2920
                    break;
×
2921
                }
2922
                case SML_REST_IFACE:
×
2923
                case SML_IFACE:
UNCOV
2924
                    generated = prepareProgressDataSml(
×
2925
                            pThreadInfo,
2926
                            childTbl,
2927
                            tableSeq, &timestamp, i, ttl, &pkCur, &pkCnt);
2928
                    break;
×
2929
                default:
×
UNCOV
2930
                    break;
×
2931
            }
2932
            if (generated < 0) {
×
2933
                g_fail = true;
×
UNCOV
2934
                goto free_of_progressive;
×
2935
            }
2936
            if (!stbInfo->non_stop) {
×
2937
                i += generated;
×
2938
            }
2939

2940
            // stmt2 execInsert already execute on stmt2BindAndSubmit
UNCOV
2941
            if (stbInfo->iface != STMT2_IFACE) {
×
2942
                // no stmt2 exec
UNCOV
2943
                startTs = toolsGetTimestampUs();
×
UNCOV
2944
                int code = execInsert(pThreadInfo, generated, &delay3);
×
UNCOV
2945
                if (code) {
×
2946
                    if (NO_IF_FAILED == stbInfo->continueIfFail) {
×
2947
                        warnPrint("The super table parameter "
×
2948
                                "continueIfFail: %d, STOP insertion!\n",
2949
                                stbInfo->continueIfFail);
UNCOV
2950
                        g_fail = true;
×
2951
                        goto free_of_progressive;
×
UNCOV
2952
                    } else if (YES_IF_FAILED == stbInfo->continueIfFail) {
×
2953
                        infoPrint("The super table parameter "
×
2954
                                "continueIfFail: %d, "
2955
                                "will continue to insert ..\n",
2956
                                stbInfo->continueIfFail);
UNCOV
2957
                    } else if (smart) {
×
2958
                        warnPrint("The super table parameter "
×
2959
                                "continueIfFail: %d, will create table "
2960
                                "then insert ..\n",
2961
                                stbInfo->continueIfFail);
2962

2963
                        // generator
UNCOV
2964
                        if (w == 0) {
×
UNCOV
2965
                            if(!generateTagData(stbInfo, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
2966
                                g_fail = true;
×
2967
                                goto free_of_progressive;
×
2968
                            }
2969
                        }
2970

2971
                        code = smartContinueIfFail(
×
2972
                                pThreadInfo,
2973
                                childTbl, tagData, w, ttl);
2974
                        if (0 != code) {
×
2975
                            g_fail = true;
×
UNCOV
2976
                            goto free_of_progressive;
×
2977
                        }
2978

2979
                        // move next
UNCOV
2980
                        if (++w >= TAG_BATCH_COUNT) {
×
2981
                            // reset for gen again
2982
                            w = 0;
×
2983
                            index += TAG_BATCH_COUNT;
×
2984
                        }
2985

UNCOV
2986
                        code = execInsert(pThreadInfo, generated, &delay3);
×
UNCOV
2987
                        if (code) {
×
2988
                            g_fail = true;
×
2989
                            goto free_of_progressive;
×
2990
                        }
2991
                    } else {
UNCOV
2992
                        warnPrint("Unknown super table parameter "
×
2993
                                "continueIfFail: %d\n",
2994
                                stbInfo->continueIfFail);
2995
                        g_fail = true;
×
2996
                        goto free_of_progressive;
×
2997
                    }
2998
                }
UNCOV
2999
                endTs = toolsGetTimestampUs() + 1;
×
3000
            }
3001

3002
            if (stbInfo->insert_interval > 0) {
×
3003
                debugPrint("%s() LN%d, insert_interval: %"PRIu64"\n",
×
3004
                          __func__, __LINE__, stbInfo->insert_interval);
3005
                perfPrint("sleep %" PRIu64 " ms\n",
×
3006
                              stbInfo->insert_interval);
UNCOV
3007
                toolsMsleep((int32_t)stbInfo->insert_interval);
×
3008
            }
3009

3010
            // flush
UNCOV
3011
            if (database->flush) {
×
3012
                char sql[260] = "";
×
3013
                sprintf(sql, "flush database %s", database->dbName);
×
3014
                int32_t code = executeSql(pThreadInfo->conn->taos,sql);
×
UNCOV
3015
                if (code != 0) {
×
UNCOV
3016
                  perfPrint(" %s failed. error code = 0x%x\n", sql, code);
×
3017
                } else {
3018
                   perfPrint(" %s ok.\n", sql);
×
3019
                }
3020
            }
3021

UNCOV
3022
            pThreadInfo->totalInsertRows += generated;
×
3023

3024
            if (g_arguments->terminate) {
×
3025
                goto free_of_progressive;
×
3026
            }
3027
            int protocol = stbInfo->lineProtocol;
×
UNCOV
3028
            switch (stbInfo->iface) {
×
UNCOV
3029
                case REST_IFACE:
×
3030
                case TAOSC_IFACE:
UNCOV
3031
                    memset(pThreadInfo->buffer, 0, pThreadInfo->max_sql_len);
×
UNCOV
3032
                    break;
×
3033
                case SML_REST_IFACE:
×
3034
                    memset(pThreadInfo->buffer, 0,
×
UNCOV
3035
                           g_arguments->reqPerReq *
×
UNCOV
3036
                               (pThreadInfo->max_sql_len + 1));                    
×
3037
                case SML_IFACE:
×
UNCOV
3038
                    if (TSDB_SML_JSON_PROTOCOL == protocol) {
×
UNCOV
3039
                        memset(pThreadInfo->lines[0], 0,
×
3040
                           pThreadInfo->line_buf_len);
×
3041
                    } else if (SML_JSON_TAOS_FORMAT == protocol) {
×
UNCOV
3042
                        if (pThreadInfo->lines && pThreadInfo->lines[0]) {
×
3043
                            tmfree(pThreadInfo->lines[0]);
×
UNCOV
3044
                            pThreadInfo->lines[0] = NULL;
×
3045
                        }
UNCOV
3046
                        if (pThreadInfo->json_array) {
×
UNCOV
3047
                            tools_cJSON_Delete(pThreadInfo->json_array);
×
UNCOV
3048
                            pThreadInfo->json_array = NULL;
×
3049
                        }
3050
                        pThreadInfo->json_array = tools_cJSON_CreateArray();
×
3051
                    } else {
3052
                        for (int j = 0; j < generated; j++) {
×
3053
                            debugPrint("pThreadInfo->lines[%d]: %s\n",
×
3054
                                       j, pThreadInfo->lines[j]);
UNCOV
3055
                            memset(pThreadInfo->lines[j], 0,
×
3056
                                   pThreadInfo->max_sql_len);
3057
                        }
3058
                    }
UNCOV
3059
                    break;
×
3060
                case STMT_IFACE:
×
UNCOV
3061
                    break;
×
3062
            }
3063

UNCOV
3064
            int64_t delay4 = endTs - startTs;
×
3065
            int64_t delay = delay1 + delay2 + delay3 + delay4;
×
3066
            if (delay <= 0) {
×
3067
                debugPrint("thread[%d]: startTs: %"PRId64", endTs: %"PRId64"\n",
×
3068
                        pThreadInfo->threadID, startTs, endTs);
3069
            } else {
3070
                perfPrint("insert execution time is %.6f s\n",
×
3071
                              delay / 1E6);
3072

3073
                int64_t * pDelay = benchCalloc(1, sizeof(int64_t), false);
×
3074
                *pDelay = delay;
×
3075
                if (benchArrayPush(pThreadInfo->delayList, pDelay) == NULL) {
×
3076
                    tmfree(pDelay);
×
3077
                }
3078
                pThreadInfo->totalDelay += delay;
×
3079
                pThreadInfo->totalDelay1 += delay1;
×
3080
                pThreadInfo->totalDelay2 += delay2;
×
3081
                pThreadInfo->totalDelay3 += delay3;
×
3082
            }
UNCOV
3083
            delay1 = delay2 = delay3 = 0;
×
3084

3085
            int64_t currentPrintTime = toolsGetTimestampMs();
×
3086
            if (currentPrintTime - lastPrintTime > 30 * 1000) {
×
UNCOV
3087
                infoPrint(
×
3088
                        "thread[%d] has currently inserted rows: "
3089
                        "%" PRId64 ", peroid insert rate: %.3f rows/s \n",
3090
                        pThreadInfo->threadID, pThreadInfo->totalInsertRows,
3091
                        (double)(pThreadInfo->totalInsertRows - lastTotalInsertRows) * 1000.0/(currentPrintTime - lastPrintTime));
UNCOV
3092
                lastPrintTime = currentPrintTime;
×
3093
                lastTotalInsertRows = pThreadInfo->totalInsertRows;
×
3094
            }
UNCOV
3095
            if (i >= stbInfo->insertRows) {
×
UNCOV
3096
                break;
×
3097
            }
3098
        }  // insertRows
3099
    }      // tableSeq
UNCOV
3100
free_of_progressive:
×
UNCOV
3101
    cleanupAndPrint(pThreadInfo, "progressive");
×
3102
    if(csvFile) {
×
3103
        fclose(csvFile);
×
3104
    }
3105
    tmfree(tagData);
×
UNCOV
3106
    return NULL;
×
3107
}
3108

UNCOV
3109
uint64_t strToTimestamp(char * tsStr) {
×
UNCOV
3110
    uint64_t ts = 0;
×
3111
    // remove double quota mark
3112
    if (tsStr[0] == '\"' || tsStr[0] == '\'') {
×
3113
        tsStr += 1;
×
3114
        int32_t last = strlen(tsStr) - 1;
×
UNCOV
3115
        if (tsStr[last] == '\"' || tsStr[0] == '\'') {
×
3116
            tsStr[last] = 0;
×
3117
        }
3118
    }
3119

UNCOV
3120
    if (toolsParseTime(tsStr, (int64_t*)&ts, strlen(tsStr), TSDB_TIME_PRECISION_MILLI, 0)) {
×
3121
        // not timestamp str format, maybe int64 format
UNCOV
3122
        ts = (int64_t)atol(tsStr);
×
3123
    }
3124

3125
    return ts;
×
3126
}
3127

UNCOV
3128
static int initStmtDataValue(SSuperTable *stbInfo, SChildTable *childTbl, uint64_t *bind_ts_array) {
×
UNCOV
3129
    int32_t columnCount = stbInfo->cols->size;
×
3130

3131
    char *sampleDataBuf;
UNCOV
3132
    if (childTbl) {
×
3133
        sampleDataBuf = childTbl->sampleDataBuf;
×
3134
    } else {
UNCOV
3135
        sampleDataBuf = stbInfo->sampleDataBuf;
×
3136
    }
UNCOV
3137
    int64_t lenOfOneRow = stbInfo->lenOfCols;
×
3138

3139
    if (stbInfo->useSampleTs) {
×
3140
        columnCount += 1;  // for skipping first column
×
3141
    }
UNCOV
3142
    for (int i=0; i < g_arguments->prepared_rand; i++) {
×
3143
        int cursor = 0;
×
3144

UNCOV
3145
        for (int c = 0; c < columnCount; c++) {
×
UNCOV
3146
            char *restStr = sampleDataBuf
×
3147
                + lenOfOneRow * i + cursor;
×
3148
            int lengthOfRest = strlen(restStr);
×
3149

3150
            int index = 0;
×
3151
            for (index = 0; index < lengthOfRest; index++) {
×
3152
                if (restStr[index] == ',') {
×
3153
                    break;
×
3154
                }
3155
            }
3156

UNCOV
3157
            cursor += index + 1;  // skip ',' too
×
3158

UNCOV
3159
            char *tmpStr = calloc(1, index + 1);
×
3160
            if (NULL == tmpStr) {
×
UNCOV
3161
                errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
×
3162
                        __func__, __LINE__, index + 1);
3163
                return -1;
×
3164
            }
3165

3166
            strncpy(tmpStr, restStr, index);
×
3167
            if ((0 == c) && stbInfo->useSampleTs) {
×
3168
                // set ts to 
UNCOV
3169
                bind_ts_array[i] = strToTimestamp(tmpStr); 
×
3170
                free(tmpStr);
×
3171
                continue;
×
3172
            }
3173

UNCOV
3174
            Field *col = benchArrayGet(stbInfo->cols,
×
3175
                    (stbInfo->useSampleTs?c-1:c));
×
UNCOV
3176
            char dataType = col->type;
×
3177

3178
            StmtData *stmtData;
UNCOV
3179
            if (childTbl) {
×
3180
                ChildField *childCol =
3181
                    benchArrayGet(childTbl->childCols,
×
UNCOV
3182
                                  (stbInfo->useSampleTs?c-1:c));
×
3183
                stmtData = &childCol->stmtData;
×
3184
            } else {
3185
                stmtData = &col->stmtData;
×
3186
            }
3187

3188
            // set value
3189
            stmtData->is_null[i] = 0;
×
3190
            stmtData->lengths[i] = col->length;
×
3191

UNCOV
3192
            if (0 == strcmp(tmpStr, "NULL")) {
×
UNCOV
3193
                *(stmtData->is_null + i) = true;
×
3194
            } else {
3195
                switch (dataType) {
×
UNCOV
3196
                    case TSDB_DATA_TYPE_INT:
×
3197
                    case TSDB_DATA_TYPE_UINT:
3198
                        *((int32_t*)stmtData->data + i) = atoi(tmpStr);
×
3199
                        break;
×
UNCOV
3200
                    case TSDB_DATA_TYPE_FLOAT:
×
3201
                        *((float*)stmtData->data +i) = (float)atof(tmpStr);
×
UNCOV
3202
                        break;
×
UNCOV
3203
                    case TSDB_DATA_TYPE_DOUBLE:
×
3204
                        *((double*)stmtData->data + i) = atof(tmpStr);
×
3205
                        break;
×
UNCOV
3206
                    case TSDB_DATA_TYPE_TINYINT:
×
3207
                    case TSDB_DATA_TYPE_UTINYINT:
3208
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
×
3209
                        break;
×
UNCOV
3210
                    case TSDB_DATA_TYPE_SMALLINT:
×
3211
                    case TSDB_DATA_TYPE_USMALLINT:
3212
                        *((int16_t*)stmtData->data + i) = (int16_t)atoi(tmpStr);
×
3213
                        break;
×
3214
                    case TSDB_DATA_TYPE_BIGINT:
×
3215
                    case TSDB_DATA_TYPE_UBIGINT:
UNCOV
3216
                        *((int64_t*)stmtData->data + i) = (int64_t)atol(tmpStr);
×
3217
                        break;
×
UNCOV
3218
                    case TSDB_DATA_TYPE_BOOL:
×
3219
                        *((int8_t*)stmtData->data + i) = (int8_t)atoi(tmpStr);
×
3220
                        break;
×
3221
                    case TSDB_DATA_TYPE_TIMESTAMP:
×
UNCOV
3222
                        *((int64_t*)stmtData->data + i) = (int64_t)atol(tmpStr);
×
3223
                        break;
×
UNCOV
3224
                    case TSDB_DATA_TYPE_BINARY:
×
3225
                    case TSDB_DATA_TYPE_NCHAR:
3226
                    case TSDB_DATA_TYPE_VARBINARY:
3227
                    case TSDB_DATA_TYPE_GEOMETRY:
3228
                        {
UNCOV
3229
                            size_t tmpLen = strlen(tmpStr);
×
3230
                            debugPrint("%s() LN%d, index: %d, "
×
3231
                                    "tmpStr len: %"PRIu64", col->length: %d\n",
3232
                                    __func__, __LINE__,
3233
                                    i, (uint64_t)tmpLen, col->length);
3234
                            if (tmpLen-2 > col->length) {
×
UNCOV
3235
                                errorPrint("data length %"PRIu64" "
×
3236
                                        "is larger than column length %d\n",
3237
                                        (uint64_t)tmpLen, col->length);
3238
                            }
3239
                            if (tmpLen > 2) {
×
3240
                                strncpy((char *)stmtData->data
×
3241
                                            + i * col->length,
×
3242
                                        tmpStr+1,
×
3243
                                        min(col->length, tmpLen - 2));
×
3244
                            } else {
UNCOV
3245
                                strncpy((char *)stmtData->data
×
3246
                                            + i*col->length,
×
3247
                                        "", 1);
3248
                            }
3249
                        }
3250
                        break;
×
3251
                    case TSDB_DATA_TYPE_DECIMAL:
×
3252
                    case TSDB_DATA_TYPE_DECIMAL64:
UNCOV
3253
                        errorPrint("Not implemented data type in func initStmtDataValue: %s\n",
×
3254
                                convertDatatypeToString(dataType));
3255
                        exit(EXIT_FAILURE);
×
3256
                    default:
×
3257
                        break;
×
3258
                }
3259
            }
3260
            free(tmpStr);
×
3261
        }
3262
    }
UNCOV
3263
    return 0;
×
3264
}
3265

UNCOV
3266
static void initStmtData(char dataType, void **data, uint32_t length) {
×
3267
    char *tmpP = NULL;
×
3268

UNCOV
3269
    switch (dataType) {
×
UNCOV
3270
        case TSDB_DATA_TYPE_INT:
×
3271
        case TSDB_DATA_TYPE_UINT:
3272
            tmpP = calloc(1, sizeof(int) * g_arguments->prepared_rand);
×
3273
            assert(tmpP);
×
UNCOV
3274
            tmfree(*data);
×
UNCOV
3275
            *data = (void*)tmpP;
×
UNCOV
3276
            break;
×
3277

3278
        case TSDB_DATA_TYPE_TINYINT:
×
3279
        case TSDB_DATA_TYPE_UTINYINT:
3280
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
×
3281
            assert(tmpP);
×
UNCOV
3282
            tmfree(*data);
×
3283
            *data = (void*)tmpP;
×
3284
            break;
×
3285

UNCOV
3286
        case TSDB_DATA_TYPE_SMALLINT:
×
3287
        case TSDB_DATA_TYPE_USMALLINT:
3288
            tmpP = calloc(1, sizeof(int16_t) * g_arguments->prepared_rand);
×
3289
            assert(tmpP);
×
UNCOV
3290
            tmfree(*data);
×
3291
            *data = (void*)tmpP;
×
UNCOV
3292
            break;
×
3293

3294
        case TSDB_DATA_TYPE_BIGINT:
×
3295
        case TSDB_DATA_TYPE_UBIGINT:
UNCOV
3296
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
×
UNCOV
3297
            assert(tmpP);
×
3298
            tmfree(*data);
×
UNCOV
3299
            *data = (void*)tmpP;
×
UNCOV
3300
            break;
×
3301

UNCOV
3302
        case TSDB_DATA_TYPE_BOOL:
×
UNCOV
3303
            tmpP = calloc(1, sizeof(int8_t) * g_arguments->prepared_rand);
×
3304
            assert(tmpP);
×
3305
            tmfree(*data);
×
UNCOV
3306
            *data = (void*)tmpP;
×
3307
            break;
×
3308

UNCOV
3309
        case TSDB_DATA_TYPE_FLOAT:
×
3310
            tmpP = calloc(1, sizeof(float) * g_arguments->prepared_rand);
×
3311
            assert(tmpP);
×
3312
            tmfree(*data);
×
3313
            *data = (void*)tmpP;
×
3314
            break;
×
3315

3316
        case TSDB_DATA_TYPE_DOUBLE:
×
UNCOV
3317
            tmpP = calloc(1, sizeof(double) * g_arguments->prepared_rand);
×
3318
            assert(tmpP);
×
3319
            tmfree(*data);
×
3320
            *data = (void*)tmpP;
×
3321
            break;
×
3322

UNCOV
3323
        case TSDB_DATA_TYPE_BINARY:
×
3324
        case TSDB_DATA_TYPE_NCHAR:
3325
        case TSDB_DATA_TYPE_VARBINARY:
3326
        case TSDB_DATA_TYPE_GEOMETRY:
3327
            tmpP = calloc(1, g_arguments->prepared_rand * length);
×
3328
            assert(tmpP);
×
3329
            tmfree(*data);
×
3330
            *data = (void*)tmpP;
×
UNCOV
3331
            break;
×
3332

UNCOV
3333
        case TSDB_DATA_TYPE_TIMESTAMP:
×
3334
            tmpP = calloc(1, sizeof(int64_t) * g_arguments->prepared_rand);
×
3335
            assert(tmpP);
×
3336
            tmfree(*data);
×
3337
            *data = (void*)tmpP;
×
3338
            break;
×
3339

3340
        case TSDB_DATA_TYPE_DECIMAL:
×
3341
        case TSDB_DATA_TYPE_DECIMAL64:
3342
            errorPrint("Not implemented data type in func initStmtData: %s\n",
×
3343
                       convertDatatypeToString(dataType));
3344
            exit(EXIT_FAILURE);
×
3345

UNCOV
3346
        default:
×
3347
            errorPrint("Unknown data type on initStmtData: %s\n",
×
3348
                       convertDatatypeToString(dataType));
3349
            exit(EXIT_FAILURE);
×
3350
    }
3351
}
×
3352

UNCOV
3353
static int parseBufferToStmtBatchChildTbl(SSuperTable *stbInfo,
×
3354
                                          SChildTable* childTbl, uint64_t *bind_ts_array) {
3355
    int32_t columnCount = stbInfo->cols->size;
×
3356

3357
    for (int c = 0; c < columnCount; c++) {
×
3358
        Field *col = benchArrayGet(stbInfo->cols, c);
×
3359
        ChildField *childCol = benchArrayGet(childTbl->childCols, c);
×
UNCOV
3360
        char dataType = col->type;
×
3361

3362
        // malloc memory
UNCOV
3363
        tmfree(childCol->stmtData.is_null);
×
UNCOV
3364
        tmfree(childCol->stmtData.lengths);
×
3365
        childCol->stmtData.is_null = benchCalloc(sizeof(char),     g_arguments->prepared_rand, true);
×
3366
        childCol->stmtData.lengths = benchCalloc(sizeof(int32_t),  g_arguments->prepared_rand, true);
×
3367

3368
        initStmtData(dataType, &(childCol->stmtData.data), col->length);
×
3369
    }
3370

3371
    return initStmtDataValue(stbInfo, childTbl, bind_ts_array);
×
3372
}
3373

3374
static int parseBufferToStmtBatch(SSuperTable* stbInfo, uint64_t *bind_ts_array) {
×
3375
    int32_t columnCount = stbInfo->cols->size;
×
3376

UNCOV
3377
    for (int c = 0; c < columnCount; c++) {
×
3378
        Field *col = benchArrayGet(stbInfo->cols, c);
×
3379

3380
        //remalloc element count is g_arguments->prepared_rand buffer
UNCOV
3381
        tmfree(col->stmtData.is_null);
×
3382
        col->stmtData.is_null = benchCalloc(sizeof(char), g_arguments->prepared_rand, false);
×
UNCOV
3383
        tmfree(col->stmtData.lengths);
×
3384
        col->stmtData.lengths = benchCalloc(sizeof(int32_t), g_arguments->prepared_rand, false);
×
3385

UNCOV
3386
        initStmtData(col->type, &(col->stmtData.data), col->length);
×
3387
    }
3388

3389
    return initStmtDataValue(stbInfo, NULL, bind_ts_array);
×
3390
}
3391

UNCOV
3392
static int64_t fillChildTblNameByCount(SSuperTable *stbInfo) {
×
NEW
3393
    if (stbInfo->useTagTableName) {
×
NEW
3394
        return stbInfo->childTblCount;
×
3395
    }
3396

UNCOV
3397
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
×
UNCOV
3398
        char childName[TSDB_TABLE_NAME_LEN]={0};
×
3399
        snprintf(childName,
×
3400
                 TSDB_TABLE_NAME_LEN,
3401
                 "%s%" PRIu64,
3402
                 stbInfo->childTblPrefix, i);
UNCOV
3403
        stbInfo->childTblArray[i]->name = strdup(childName);
×
UNCOV
3404
        debugPrint("%s(): %s\n", __func__,
×
3405
                  stbInfo->childTblArray[i]->name);
3406
    }
3407

3408
    return stbInfo->childTblCount;
×
3409
}
3410

UNCOV
3411
static int64_t fillChildTblNameByFromTo(SDataBase *database,
×
3412
        SSuperTable* stbInfo) {
3413
    for (int64_t i = stbInfo->childTblFrom; i <= stbInfo->childTblTo; i++) {
×
UNCOV
3414
        char childName[TSDB_TABLE_NAME_LEN]={0};
×
UNCOV
3415
        snprintf(childName,
×
3416
                TSDB_TABLE_NAME_LEN,
3417
                "%s%" PRIu64,
3418
                stbInfo->childTblPrefix, i);
3419
        stbInfo->childTblArray[i]->name = strdup(childName);
×
3420
    }
3421

UNCOV
3422
    return (stbInfo->childTblTo-stbInfo->childTblFrom);
×
3423
}
3424

3425
static int64_t fillChildTblNameByLimitOffset(SDataBase *database,
×
3426
        SSuperTable* stbInfo) {
UNCOV
3427
    SBenchConn* conn = initBenchConn();
×
3428
    if (NULL == conn) {
×
UNCOV
3429
        return -1;
×
3430
    }
3431
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
UNCOV
3432
    if (g_arguments->taosc_version == 3) {
×
UNCOV
3433
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
3434
                 "SELECT DISTINCT(TBNAME) FROM %s.`%s` LIMIT %" PRId64
3435
                 " OFFSET %" PRIu64,
3436
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
3437
                 stbInfo->childTblOffset);
3438
    } else {
3439
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
3440
                 "SELECT TBNAME FROM %s.`%s` LIMIT %" PRId64
3441
                 " OFFSET %" PRIu64,
3442
                 database->dbName, stbInfo->stbName, stbInfo->childTblLimit,
3443
                 stbInfo->childTblOffset);
3444
    }
3445
    debugPrint("cmd: %s\n", cmd);
×
3446
    TAOS_RES *res = taos_query(conn->taos, cmd);
×
UNCOV
3447
    int32_t   code = taos_errno(res);
×
UNCOV
3448
    int64_t   count = 0;
×
UNCOV
3449
    if (code) {
×
3450
        printErrCmdCodeStr(cmd, code, res);
×
UNCOV
3451
        closeBenchConn(conn);
×
UNCOV
3452
        return -1;
×
3453
    }
UNCOV
3454
    TAOS_ROW row = NULL;
×
3455
    while ((row = taos_fetch_row(res)) != NULL) {
×
3456
        int *lengths = taos_fetch_lengths(res);
×
3457
        char * childName = benchCalloc(1, lengths[0] + 1, true);
×
UNCOV
3458
        strncpy(childName, row[0], lengths[0]);
×
UNCOV
3459
        childName[lengths[0]] = '\0';
×
UNCOV
3460
        stbInfo->childTblArray[count]->name = childName;
×
3461
        debugPrint("stbInfo->childTblArray[%" PRId64 "]->name: %s\n",
×
3462
                   count, stbInfo->childTblArray[count]->name);
UNCOV
3463
        count++;
×
3464
    }
UNCOV
3465
    taos_free_result(res);
×
UNCOV
3466
    closeBenchConn(conn);
×
3467
    return count;
×
3468
}
3469

3470
static void preProcessArgument(SSuperTable *stbInfo) {
×
3471
    if (stbInfo->interlaceRows > g_arguments->reqPerReq) {
×
UNCOV
3472
        infoPrint(
×
3473
            "interlaceRows(%d) is larger than record per request(%u), which "
3474
            "will be set to %u\n",
3475
            stbInfo->interlaceRows, g_arguments->reqPerReq,
3476
            g_arguments->reqPerReq);
UNCOV
3477
        stbInfo->interlaceRows = g_arguments->reqPerReq;
×
3478
    }
3479

UNCOV
3480
    if (stbInfo->interlaceRows > stbInfo->insertRows) {
×
3481
        infoPrint(
×
3482
                "interlaceRows larger than insertRows %d > %" PRId64 "\n",
3483
                stbInfo->interlaceRows, stbInfo->insertRows);
UNCOV
3484
        infoPrint("%s", "interlaceRows will be set to 0\n");
×
UNCOV
3485
        stbInfo->interlaceRows = 0;
×
3486
    }
3487

3488
    if (stbInfo->interlaceRows == 0
×
3489
            && g_arguments->reqPerReq > stbInfo->insertRows) {
×
3490
        infoPrint("record per request (%u) is larger than "
×
3491
                "insert rows (%"PRIu64")"
3492
                " in progressive mode, which will be set to %"PRIu64"\n",
3493
                g_arguments->reqPerReq, stbInfo->insertRows,
3494
                stbInfo->insertRows);
UNCOV
3495
        g_arguments->reqPerReq = stbInfo->insertRows;
×
3496
    }
3497

3498
    if (stbInfo->interlaceRows > 0 && stbInfo->iface == STMT_IFACE
×
3499
            && stbInfo->autoTblCreating) {
×
3500
        errorPrint("%s","stmt not support autocreate table with interlace row , quit programe!\n");
×
3501
        exit(-1);
×
3502
    }
3503
}
×
3504

3505
static int printTotalDelay(SDataBase *database,
×
3506
                           int64_t totalDelay,
3507
                           int64_t totalDelay1,
3508
                           int64_t totalDelay2,
3509
                           int64_t totalDelay3,
3510
                           BArray *total_delay_list,
3511
                            int threads,
3512
                            int64_t totalInsertRows,
3513
                            int64_t spend) {
3514
    // zero check
UNCOV
3515
    if (total_delay_list->size == 0 || spend == 0 || threads == 0) {
×
UNCOV
3516
        return -1;
×
3517
    }
3518

3519
    char subDelay[128] = "";
×
UNCOV
3520
    if(totalDelay1 + totalDelay2 + totalDelay3 > 0) {
×
UNCOV
3521
        sprintf(subDelay, " stmt delay1=%.2fs delay2=%.2fs delay3=%.2fs",
×
3522
                totalDelay1/threads/1E6,
×
3523
                totalDelay2/threads/1E6,
×
UNCOV
3524
                totalDelay3/threads/1E6);
×
3525
    }
3526

3527
    double time_cost = spend / 1E6;
×
UNCOV
3528
    double real_time_cost = totalDelay/threads/1E6;
×
UNCOV
3529
    double records_per_second = (double)(totalInsertRows / (spend/1E6));
×
3530
    double real_records_per_second = (double)(totalInsertRows / (totalDelay/threads/1E6));
×
3531

3532
    succPrint("Spent %.6f (real %.6f) seconds to insert rows: %" PRIu64
×
3533
              " with %d thread(s) into %s %.2f (real %.2f) records/second%s\n",
3534
              time_cost, real_time_cost, totalInsertRows, threads,
3535
              database->dbName, records_per_second,
3536
              real_records_per_second, subDelay);
3537

UNCOV
3538
    if (!total_delay_list->size) {
×
UNCOV
3539
        return -1;
×
3540
    }
3541
    
3542
    double minDelay = *(int64_t *)(benchArrayGet(total_delay_list, 0))/1E3;
×
3543
    double avgDelay = (double)totalDelay/total_delay_list->size/1E3;
×
UNCOV
3544
    double p90 = *(int64_t *)(benchArrayGet(total_delay_list,
×
3545
                                         (int32_t)(total_delay_list->size
×
UNCOV
3546
                                         * 0.9)))/1E3;
×
3547
    double p95 = *(int64_t *)(benchArrayGet(total_delay_list,
×
UNCOV
3548
                                         (int32_t)(total_delay_list->size
×
UNCOV
3549
                                         * 0.95)))/1E3;
×
UNCOV
3550
    double p99 = *(int64_t *)(benchArrayGet(total_delay_list,
×
UNCOV
3551
                                         (int32_t)(total_delay_list->size
×
UNCOV
3552
                                         * 0.99)))/1E3;
×
UNCOV
3553
    double maxDelay = *(int64_t *)(benchArrayGet(total_delay_list,
×
UNCOV
3554
                                         (int32_t)(total_delay_list->size
×
UNCOV
3555
                                         - 1)))/1E3;                                     
×
3556

3557
    succPrint("insert delay, "
×
3558
              "min: %.4fms, "
3559
              "avg: %.4fms, "
3560
              "p90: %.4fms, "
3561
              "p95: %.4fms, "
3562
              "p99: %.4fms, "
3563
              "max: %.4fms\n",
3564
            minDelay, avgDelay, p90, p95, p99, maxDelay);
3565
    
3566
    if (g_arguments->output_json_file) {
×
UNCOV
3567
        tools_cJSON *root = tools_cJSON_CreateObject();
×
UNCOV
3568
        if (root) {
×
3569
            tools_cJSON_AddStringToObject(root, "db_name", database->dbName);
×
3570
            tools_cJSON_AddNumberToObject(root, "inserted_rows", totalInsertRows);
×
3571
            tools_cJSON_AddNumberToObject(root, "threads", threads);
×
3572
            tools_cJSON_AddNumberToObject(root, "time_cost", time_cost);
×
UNCOV
3573
            tools_cJSON_AddNumberToObject(root, "real_time_cost", real_time_cost);
×
3574
            tools_cJSON_AddNumberToObject(root, "records_per_second",  records_per_second);
×
UNCOV
3575
            tools_cJSON_AddNumberToObject(root, "real_records_per_second", real_records_per_second);
×
3576
            
UNCOV
3577
            tools_cJSON_AddNumberToObject(root, "avg", avgDelay);
×
UNCOV
3578
            tools_cJSON_AddNumberToObject(root, "min", minDelay);
×
UNCOV
3579
            tools_cJSON_AddNumberToObject(root, "max", maxDelay);
×
3580
            tools_cJSON_AddNumberToObject(root, "p90", p90);
×
3581
            tools_cJSON_AddNumberToObject(root, "p95", p95);
×
UNCOV
3582
            tools_cJSON_AddNumberToObject(root, "p99", p99);
×
3583
            
3584
            char *jsonStr = tools_cJSON_PrintUnformatted(root);
×
3585
            if (jsonStr) {
×
3586
                FILE *fp = fopen(g_arguments->output_json_file, "w");
×
3587
                if (fp) {
×
3588
                    fprintf(fp, "%s\n", jsonStr);
×
3589
                    fclose(fp);
×
3590
                } else {
3591
                    errorPrint("Failed to open output JSON file, file name %s\n",
×
3592
                            g_arguments->output_json_file);
3593
                }
3594
                free(jsonStr);
×
3595
            }
3596
            tools_cJSON_Delete(root);
×
3597
        }
3598
    }        
3599
    return 0;
×
3600
}
3601

UNCOV
3602
static int64_t fillChildTblNameImp(SDataBase *database, SSuperTable *stbInfo) {
×
3603
    int64_t ntables;
UNCOV
3604
    if (stbInfo->childTblLimit) {
×
UNCOV
3605
        ntables = fillChildTblNameByLimitOffset(database, stbInfo);
×
UNCOV
3606
    } else if (stbInfo->childTblFrom || stbInfo->childTblTo) {
×
UNCOV
3607
        ntables = fillChildTblNameByFromTo(database, stbInfo);
×
3608
    } else {
3609
        ntables = fillChildTblNameByCount(stbInfo);
×
3610
    }
3611
    return ntables;
×
3612
}
3613

3614
static int64_t fillChildTblName(SDataBase *database, SSuperTable *stbInfo) {
×
3615
    int64_t ntables = stbInfo->childTblCount;
×
3616
    stbInfo->childTblArray = benchCalloc(stbInfo->childTblCount,
×
3617
            sizeof(SChildTable*), true);
UNCOV
3618
    for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
×
3619
        stbInfo->childTblArray[child] =
×
3620
            benchCalloc(1, sizeof(SChildTable), true);
×
3621
    }
3622

3623
    if (stbInfo->childTblCount == 1 && stbInfo->tags->size == 0) {
×
3624
        // Normal table
UNCOV
3625
        char childName[TSDB_TABLE_NAME_LEN]={0};
×
3626
        snprintf(childName, TSDB_TABLE_NAME_LEN,
×
3627
                    "%s", stbInfo->stbName);
3628
        stbInfo->childTblArray[0]->name = strdup(childName);
×
3629
    } else if ((stbInfo->iface != SML_IFACE
×
3630
        && stbInfo->iface != SML_REST_IFACE)
×
3631
            && stbInfo->childTblExists) {
×
UNCOV
3632
        ntables = fillChildTblNameImp(database, stbInfo);
×
3633
    } else {
UNCOV
3634
        ntables = fillChildTblNameByCount(stbInfo);
×
3635
    }
3636

UNCOV
3637
    return ntables;
×
3638
}
3639

3640
// last ts fill to filllBackTime
3641
static bool fillSTableLastTs(SDataBase *database, SSuperTable *stbInfo) {
×
UNCOV
3642
    SBenchConn* conn = initBenchConn();
×
UNCOV
3643
    if (NULL == conn) {
×
3644
        return false;
×
3645
    }
3646
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
3647
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "select last(ts) from %s.`%s`", database->dbName, stbInfo->stbName);
×
3648

3649
    infoPrint("fillBackTime: %s\n", cmd);
×
UNCOV
3650
    TAOS_RES *res = taos_query(conn->taos, cmd);
×
3651
    int32_t   code = taos_errno(res);
×
UNCOV
3652
    if (code) {
×
3653
        printErrCmdCodeStr(cmd, code, res);
×
UNCOV
3654
        closeBenchConn(conn);
×
UNCOV
3655
        return false;
×
3656
    }
3657

3658
    TAOS_ROW row = taos_fetch_row(res);
×
UNCOV
3659
    if(row == NULL) {
×
3660
        taos_free_result(res);
×
3661
        closeBenchConn(conn);
×
3662
        return false;
×
3663
    }
3664
    
3665
    char lastTs[128];
UNCOV
3666
    memset(lastTs, 0, sizeof(lastTs));
×
3667

3668
    stbInfo->startFillbackTime = *(int64_t*)row[0];
×
UNCOV
3669
    toolsFormatTimestamp(lastTs, stbInfo->startFillbackTime, database->precision);
×
3670
    infoPrint("fillBackTime: get ok %s.%s last ts=%s \n", database->dbName, stbInfo->stbName, lastTs);
×
3671
    
3672
    taos_free_result(res);
×
3673
    closeBenchConn(conn);
×
3674

UNCOV
3675
    return true;
×
3676
}
3677

3678
// calcNow expression fill to timestamp_start
3679
static bool calcExprFromServer(SDataBase *database, SSuperTable *stbInfo) {
×
UNCOV
3680
    SBenchConn* conn = initBenchConn();
×
UNCOV
3681
    if (NULL == conn) {
×
UNCOV
3682
        return false;
×
3683
    }
3684
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
×
3685
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN, "select %s", stbInfo->calcNow);
×
3686

UNCOV
3687
    infoPrint("calcExprFromServer: %s\n", cmd);
×
3688
    TAOS_RES *res = taos_query(conn->taos, cmd);
×
3689
    int32_t   code = taos_errno(res);
×
UNCOV
3690
    if (code) {
×
3691
        printErrCmdCodeStr(cmd, code, res);
×
3692
        closeBenchConn(conn);
×
3693
        return false;
×
3694
    }
3695

3696
    TAOS_ROW row = taos_fetch_row(res);
×
3697
    if(row == NULL) {
×
UNCOV
3698
        taos_free_result(res);
×
UNCOV
3699
        closeBenchConn(conn);
×
3700
        return false;
×
3701
    }
3702
    
3703
    char ts[128];
3704
    memset(ts, 0, sizeof(ts));
×
3705

UNCOV
3706
    stbInfo->startTimestamp = *(int64_t*)row[0];
×
UNCOV
3707
    toolsFormatTimestamp(ts, stbInfo->startTimestamp, database->precision);
×
3708
    infoPrint("calcExprFromServer: get ok.  %s = %s \n", stbInfo->calcNow, ts);
×
3709
    
3710
    taos_free_result(res);
×
3711
    closeBenchConn(conn);
×
3712

UNCOV
3713
    return true;
×
3714
}
3715

UNCOV
3716
int64_t obtainTableCount(SDataBase* database, SSuperTable* stbInfo) {
×
3717
    // ntable calc
3718
    int64_t ntables;
UNCOV
3719
    if (stbInfo->childTblTo > 0) {
×
UNCOV
3720
        ntables = stbInfo->childTblTo - stbInfo->childTblFrom;
×
3721
    } else if (stbInfo->childTblLimit > 0 && stbInfo->childTblExists) {
×
3722
        ntables = stbInfo->childTblLimit;
×
3723
    } else {
3724
        ntables = stbInfo->childTblCount;
×
3725
    }
3726

3727
    return ntables;
×
3728
}
3729

3730
// assign table to thread with vgroups, return assign thread count
3731
int32_t assignTableToThread(SDataBase* database, SSuperTable* stbInfo) {
×
3732
    int32_t threads = 0;
×
3733

3734
    // calc table count per vgroup
3735
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
×
UNCOV
3736
        int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
×
UNCOV
3737
        if (vgIdx == -1) {
×
3738
            continue;
×
3739
        }
3740
        SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
×
3741
        vg->tbCountPerVgId ++;
×
3742
    }
3743

3744
    // malloc vg->childTblArray memory with table count
UNCOV
3745
    for (int v = 0; v < database->vgroups; v++) {
×
3746
        SVGroup *vg = benchArrayGet(database->vgArray, v);
×
UNCOV
3747
        infoPrint("Local hash calc %"PRId64" tables on %s's vgroup %d (id: %d)\n",
×
3748
                    vg->tbCountPerVgId, database->dbName, v, vg->vgId);
3749
        if (vg->tbCountPerVgId) {
×
3750
            threads++;
×
3751
        } else {
3752
            continue;
×
3753
        }
UNCOV
3754
        vg->childTblArray = benchCalloc(vg->tbCountPerVgId, sizeof(SChildTable*), true);
×
3755
        vg->tbOffset      = 0;
×
3756
    }
3757
    
3758
    // set vg->childTblArray data
UNCOV
3759
    for (int64_t i = 0; i < stbInfo->childTblCount; i++) {
×
UNCOV
3760
        int32_t vgIdx = calcGroupIndex(database->dbName, stbInfo->childTblArray[i]->name, database->vgroups);
×
3761
        if (vgIdx == -1) {
×
3762
            continue;
×
3763
        }
3764
        SVGroup *vg = benchArrayGet(database->vgArray, vgIdx);
×
UNCOV
3765
        debugPrint("calc table hash to vgroup %s.%s vgIdx=%d\n",
×
3766
                    database->dbName,
3767
                    stbInfo->childTblArray[i]->name, vgIdx);
UNCOV
3768
        vg->childTblArray[vg->tbOffset] = stbInfo->childTblArray[i];
×
3769
        vg->tbOffset++;
×
3770
    }
UNCOV
3771
    return threads;
×
3772
}
3773

3774
// init stmt
UNCOV
3775
TAOS_STMT* initStmt(TAOS* taos, bool single) {
×
UNCOV
3776
    if (!single) {
×
3777
        infoPrint("initStmt call taos_stmt_init single=%d\n", single);
×
3778
        return taos_stmt_init(taos);
×
3779
    }
3780

3781
    TAOS_STMT_OPTIONS op;
3782
    memset(&op, 0, sizeof(op));
×
3783
    op.singleStbInsert      = single;
×
UNCOV
3784
    op.singleTableBindOnce  = single;
×
UNCOV
3785
    infoPrint("initStmt call taos_stmt_init_with_options single=%d\n", single);
×
UNCOV
3786
    return taos_stmt_init_with_options(taos, &op);
×
3787
}
3788

3789
// init stmt2
UNCOV
3790
TAOS_STMT2* initStmt2(TAOS* taos, bool single) {
×
3791
    TAOS_STMT2_OPTION op2;
3792
    memset(&op2, 0, sizeof(op2));
×
UNCOV
3793
    op2.singleStbInsert      = single;
×
3794
    op2.singleTableBindOnce  = single;
×
3795
    
3796
    TAOS_STMT2* stmt2 = taos_stmt2_init(taos, &op2);
×
3797
    if (stmt2) 
×
UNCOV
3798
        succPrint("succ  taos_stmt2_init single=%d\n", single);
×
3799
    else
UNCOV
3800
        errorPrint("failed taos_stmt2_init single=%d\n", single);
×
3801
    return stmt2;
×
3802
}
3803

3804
// init insert thread
UNCOV
3805
void initTsArray(uint64_t *bind_ts_array, SSuperTable* stbInfo) {
×
3806
    parseBufferToStmtBatch(stbInfo, bind_ts_array);
×
3807
    for (int64_t child = 0; child < stbInfo->childTblCount; child++) {
×
UNCOV
3808
        SChildTable *childTbl = stbInfo->childTblArray[child];
×
UNCOV
3809
        if (childTbl->useOwnSample) {
×
3810
            parseBufferToStmtBatchChildTbl(stbInfo, childTbl, bind_ts_array);
×
3811
        }
3812
    }
3813
    
UNCOV
3814
}
×
3815

UNCOV
3816
void *genInsertTheadInfo(void* arg) {
×
3817

3818
    if (g_arguments->terminate || g_fail) {
×
3819
        return NULL;
×
3820
    }
3821

UNCOV
3822
    threadInfo* pThreadInfo = (threadInfo*)arg;
×
UNCOV
3823
    SSuperTable *stbInfo = pThreadInfo->stbInfo;
×
3824
    pThreadInfo->delayList = benchArrayInit(1, sizeof(int64_t));
×
3825
    switch (stbInfo->iface) {
×
3826
        // rest
3827
        case REST_IFACE: {
×
3828
            if (stbInfo->interlaceRows > 0) {
×
UNCOV
3829
                pThreadInfo->buffer = new_ds(0);
×
3830
            } else {
UNCOV
3831
                pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
×
3832
            }
UNCOV
3833
            int sockfd = createSockFd();
×
3834
            if (sockfd < 0) {
×
3835
                g_fail = true;
×
3836
                goto END;
×
3837
            }
3838
            pThreadInfo->sockfd = sockfd;
×
3839
            break;
×
3840
        }            
3841
        // stmt & stmt2 init
3842
        case STMT_IFACE: 
×
3843
        case STMT2_IFACE: {
UNCOV
3844
            pThreadInfo->conn = initBenchConn();
×
UNCOV
3845
            if (NULL == pThreadInfo->conn) {
×
UNCOV
3846
                goto END;
×
3847
            }
3848
            // single always true for benchmark
3849
            bool single = true;
×
3850
            if (stbInfo->iface == STMT2_IFACE) {
×
3851
                // stmt2 init
3852
                if (pThreadInfo->conn->stmt2)
×
UNCOV
3853
                    taos_stmt2_close(pThreadInfo->conn->stmt2);
×
UNCOV
3854
                pThreadInfo->conn->stmt2 = initStmt2(pThreadInfo->conn->taos, single);
×
UNCOV
3855
                if (NULL == pThreadInfo->conn->stmt2) {
×
3856
                    errorPrint("taos_stmt2_init() failed, reason: %s\n", taos_errstr(NULL));
×
UNCOV
3857
                    g_fail = true;
×
3858
                    goto END;                    
×
3859
                }
3860
            } else {
3861
                // stmt init
UNCOV
3862
                if (pThreadInfo->conn->stmt)
×
UNCOV
3863
                    taos_stmt_close(pThreadInfo->conn->stmt);
×
3864
                pThreadInfo->conn->stmt = initStmt(pThreadInfo->conn->taos, single);
×
3865
                if (NULL == pThreadInfo->conn->stmt) {
×
3866
                    errorPrint("taos_stmt_init() failed, reason: %s\n", taos_errstr(NULL));
×
3867
                    g_fail = true;
×
UNCOV
3868
                    goto END;                    
×
3869
                }
3870
            }
3871

3872
            // select db
3873
            if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
×
UNCOV
3874
                errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
3875
                g_fail = true;
×
3876
                goto END;
×
3877
            }
3878

3879
            // malloc bind
3880
            int32_t unit = stbInfo->iface == STMT2_IFACE ? sizeof(TAOS_STMT2_BIND) : sizeof(TAOS_MULTI_BIND);
×
3881
            pThreadInfo->bind_ts       = benchCalloc(1, sizeof(int64_t), true);
×
3882
            
UNCOV
3883
            pThreadInfo->bindParams    = benchCalloc(1, unit * (stbInfo->cols->size + 1), true);
×
3884
            // have ts columns, so size + 1
UNCOV
3885
            pThreadInfo->lengths       = benchCalloc(stbInfo->cols->size + 1, sizeof(int32_t*), true);
×
3886
            for(int32_t c = 0; c <= stbInfo->cols->size; c++) {
×
3887
                pThreadInfo->lengths[c] = benchCalloc(g_arguments->reqPerReq, sizeof(int32_t), true);
×
3888
            }
3889
            // tags data
UNCOV
3890
            pThreadInfo->tagsStmt = copyBArray(stbInfo->tags);
×
3891
            for(int32_t n = 0; n < pThreadInfo->tagsStmt->size; n ++ ) {
×
3892
                Field *field = benchArrayGet(pThreadInfo->tagsStmt, n);
×
UNCOV
3893
                memset(&field->stmtData, 0, sizeof(StmtData));
×
3894
            }
3895
        
3896
            break;
×
3897
        }
3898
        // sml rest
3899
        case SML_REST_IFACE: {
×
3900
            int sockfd = createSockFd();
×
UNCOV
3901
            if (sockfd < 0) {
×
UNCOV
3902
                g_fail = true;
×
UNCOV
3903
                goto END;
×
3904
            }
3905
            pThreadInfo->sockfd = sockfd;
×
3906
        }            
3907
        // sml
3908
        case SML_IFACE: {
×
3909
            if (stbInfo->iface == SML_IFACE) {
×
3910
                pThreadInfo->conn = initBenchConn();
×
UNCOV
3911
                if (pThreadInfo->conn == NULL) {
×
UNCOV
3912
                    errorPrint("%s() init connection failed\n", __func__);
×
UNCOV
3913
                    g_fail = true;
×
UNCOV
3914
                    goto END;
×
3915
                }
3916
                if (taos_select_db(pThreadInfo->conn->taos, pThreadInfo->dbInfo->dbName)) {
×
3917
                    errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
3918
                    g_fail = true;
×
UNCOV
3919
                    goto END;
×
3920
                }
3921
            }
3922
            pThreadInfo->max_sql_len = stbInfo->lenOfCols + stbInfo->lenOfTags;
×
3923
            if (stbInfo->iface == SML_REST_IFACE) {
×
UNCOV
3924
                pThreadInfo->buffer = benchCalloc(1, g_arguments->reqPerReq * (1 + pThreadInfo->max_sql_len), true);
×
3925
            }                
UNCOV
3926
            int protocol = stbInfo->lineProtocol;
×
3927
            if (TSDB_SML_JSON_PROTOCOL != protocol && SML_JSON_TAOS_FORMAT != protocol) {
×
3928
                pThreadInfo->sml_tags = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true);
×
3929
                for (int t = 0; t < pThreadInfo->ntables; t++) {
×
UNCOV
3930
                    pThreadInfo->sml_tags[t] = benchCalloc(1, stbInfo->lenOfTags, true);
×
3931
                }
3932
                int64_t index = pThreadInfo->start_table_from;
×
3933
                for (int t = 0; t < pThreadInfo->ntables; t++) {
×
3934
                    if (generateRandData(
×
3935
                                stbInfo, pThreadInfo->sml_tags[t],
×
UNCOV
3936
                                stbInfo->lenOfTags,
×
UNCOV
3937
                                stbInfo->lenOfCols + stbInfo->lenOfTags,
×
3938
                                stbInfo->tags, 1, true, NULL, index++)) {
UNCOV
3939
                        g_fail = true;            
×
UNCOV
3940
                        goto END;
×
3941
                    }
3942
                    debugPrint("pThreadInfo->sml_tags[%d]: %s\n", t,
×
3943
                               pThreadInfo->sml_tags[t]);
3944
                }
3945
                pThreadInfo->lines = benchCalloc(g_arguments->reqPerReq, sizeof(char *), true);
×
UNCOV
3946
                for (int j = 0; (j < g_arguments->reqPerReq && !g_arguments->terminate); j++) {
×
3947
                    pThreadInfo->lines[j] = benchCalloc(1, pThreadInfo->max_sql_len, true);
×
3948
                }
3949
            } else {
3950
                pThreadInfo->json_array          = tools_cJSON_CreateArray();
×
3951
                pThreadInfo->sml_json_tags       = tools_cJSON_CreateArray();
×
3952
                pThreadInfo->sml_tags_json_array = (char **)benchCalloc( pThreadInfo->ntables, sizeof(char *), true);
×
3953
                for (int t = 0; t < pThreadInfo->ntables; t++) {
×
3954
                    if (stbInfo->lineProtocol == TSDB_SML_JSON_PROTOCOL) {
×
3955
                        generateSmlJsonTags(
×
3956
                            pThreadInfo->sml_json_tags,
3957
                                pThreadInfo->sml_tags_json_array,
3958
                                stbInfo,
3959
                            pThreadInfo->start_table_from, t);
3960
                    } else {
3961
                        generateSmlTaosJsonTags(
×
3962
                            pThreadInfo->sml_json_tags, stbInfo,
3963
                            pThreadInfo->start_table_from, t);
3964
                    }
3965
                }
3966
                pThreadInfo->lines = (char **)benchCalloc(1, sizeof(char *), true);
×
UNCOV
3967
                if (0 == stbInfo->interlaceRows && TSDB_SML_JSON_PROTOCOL == protocol) {
×
3968
                    pThreadInfo->line_buf_len = g_arguments->reqPerReq * accumulateRowLen(pThreadInfo->stbInfo->tags, pThreadInfo->stbInfo->iface);
×
3969
                    debugPrint("%s() LN%d, line_buf_len=%d\n", __func__, __LINE__, pThreadInfo->line_buf_len);
×
3970
                    pThreadInfo->lines[0]             = benchCalloc(1, pThreadInfo->line_buf_len, true);
×
3971
                    pThreadInfo->sml_json_value_array = (char **)benchCalloc(pThreadInfo->ntables, sizeof(char *), true);
×
3972
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
×
UNCOV
3973
                        generateSmlJsonValues(pThreadInfo->sml_json_value_array, stbInfo, t);
×
3974
                    }
3975
                }
3976
            }
3977
            break;
×
3978
        }
3979
        // taos
UNCOV
3980
        case TAOSC_IFACE: {
×
3981
            pThreadInfo->conn = initBenchConn();
×
3982
            if (pThreadInfo->conn == NULL) {
×
UNCOV
3983
                errorPrint("%s() failed to connect\n", __func__);
×
3984
                g_fail = true;
×
UNCOV
3985
                goto END;
×
3986
            }
3987
            char* command = benchCalloc(1, SHORT_1K_SQL_BUFF_LEN, false);
×
3988
            snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
3989
                    g_arguments->escape_character ? "USE `%s`" : "USE %s",
×
UNCOV
3990
                    pThreadInfo->dbInfo->dbName);
×
UNCOV
3991
            if (queryDbExecCall(pThreadInfo->conn, command)) {
×
3992
                errorPrint("taos select database(%s) failed\n", pThreadInfo->dbInfo->dbName);
×
3993
                g_fail = true;
×
3994
                goto END;
×
3995
            }
3996
            tmfree(command);
×
3997
            command = NULL;
×
3998

UNCOV
3999
            if (stbInfo->interlaceRows > 0) {
×
UNCOV
4000
                pThreadInfo->buffer = new_ds(0);
×
4001
            } else {
UNCOV
4002
                pThreadInfo->buffer = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
×
4003
                if (g_arguments->check_sql) {
×
UNCOV
4004
                    pThreadInfo->csql = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, true);
×
UNCOV
4005
                    memset(pThreadInfo->csql, 0, TSDB_MAX_ALLOWED_SQL_LEN);
×
4006
                }
4007
            }
4008

4009
            break;
×
4010
        }
4011
        default:
×
4012
            break;
×
4013
    }
4014

4015
END:
×
UNCOV
4016
    return NULL;
×
4017
}
4018
    
4019

4020
// init insert thread
UNCOV
4021
int32_t initInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, int64_t div, int64_t mod) {
×
4022
    int32_t  ret     = -1;
×
4023
    uint64_t tbNext  = stbInfo->childTblFrom;
×
4024
    int32_t  vgNext  = 0;
×
4025
    pthread_t   *pids  = benchCalloc(1, nthreads * sizeof(pthread_t),  true);
×
4026
    int threadCnt = 0;
×
4027
    uint64_t * bind_ts_array = NULL;
×
UNCOV
4028
    if (STMT2_IFACE == stbInfo->iface || STMT_IFACE == stbInfo->iface) {
×
4029
        bind_ts_array = benchCalloc(1, sizeof(int64_t)*g_arguments->prepared_rand, true);
×
4030
        initTsArray(bind_ts_array, stbInfo);
×
4031
    }
4032
    
4033

4034
    for (int32_t i = 0; i < nthreads && !g_arguments->terminate; i++) {
×
4035
        // set table
4036
        threadInfo *pThreadInfo = infos + i;
×
UNCOV
4037
        pThreadInfo->threadID   = i;
×
4038
        pThreadInfo->dbInfo     = database;
×
4039
        pThreadInfo->stbInfo    = stbInfo;
×
UNCOV
4040
        pThreadInfo->start_time = stbInfo->startTimestamp;
×
4041
        pThreadInfo->pos        = 0;
×
4042
        pThreadInfo->samplePos  = 0;
×
UNCOV
4043
        pThreadInfo->totalInsertRows = 0;
×
4044
        if (STMT2_IFACE == stbInfo->iface || STMT_IFACE == stbInfo->iface) {
×
4045
            pThreadInfo->bind_ts_array = benchCalloc(1, sizeof(int64_t)*g_arguments->prepared_rand, true);
×
4046
            memcpy(pThreadInfo->bind_ts_array, bind_ts_array, sizeof(int64_t)*g_arguments->prepared_rand);
×
4047
            
4048
        }
UNCOV
4049
        if (g_arguments->bind_vgroup) {
×
UNCOV
4050
            for (int32_t j = vgNext; j < database->vgroups; j++) {
×
4051
                SVGroup *vg = benchArrayGet(database->vgArray, j);
×
UNCOV
4052
                if (0 == vg->tbCountPerVgId) {
×
4053
                    continue;
×
4054
                }
UNCOV
4055
                pThreadInfo->vg               = vg;
×
UNCOV
4056
                pThreadInfo->ntables          = vg->tbCountPerVgId;
×
4057
                pThreadInfo->start_table_from = 0;
×
4058
                pThreadInfo->end_table_to     = vg->tbCountPerVgId - 1;
×
UNCOV
4059
                vgNext                        = j + 1;
×
UNCOV
4060
                break;
×
4061
            }            
4062
        } else {
4063
            pThreadInfo->start_table_from = tbNext;
×
4064
            pThreadInfo->ntables          = i < mod ? div + 1 : div;
×
4065
            pThreadInfo->end_table_to     = i < mod ? tbNext + div : tbNext + div - 1;
×
4066
            tbNext                        = pThreadInfo->end_table_to + 1;
×
4067
        }
4068

4069
        // init conn
4070
        pthread_create(pids + i, NULL, genInsertTheadInfo,   pThreadInfo);
×
4071
        threadCnt ++;
×
4072
    }
4073
    
4074
    // wait threads
UNCOV
4075
    for (int i = 0; i < threadCnt; i++) {
×
4076
        infoPrint("init pthread_join %d ...\n", i);
×
UNCOV
4077
        pthread_join(pids[i], NULL);
×
4078
    }
4079

4080
    if (bind_ts_array) {
×
4081
        tmfree(bind_ts_array);
×
4082
    }
4083
    
4084
    tmfree(pids);
×
4085
    if (g_fail) {
×
4086
       return -1;
×
4087
    }
4088
    return 0;
×
4089
}
4090

4091
#ifdef LINUX
4092
#define EMPTY_SLOT -1
4093
// run with limit thread
4094
int32_t runInsertLimitThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, int32_t limitThread, threadInfo *infos, pthread_t *pids) {
×
4095
    infoPrint("run with bind vgroups limit thread. limit threads=%d nthread=%d\n", limitThread, nthreads);
×
4096
    
4097
    // slots save threadInfo array index
4098
    int32_t* slot = benchCalloc(limitThread, sizeof(int32_t), false); 
×
4099
    int32_t  t    = 0; // thread index
×
4100
    for (int32_t i = 0; i < limitThread; i++) {
×
4101
        slot[i] = EMPTY_SLOT;
×
4102
    }
4103

UNCOV
4104
    while (!g_arguments->terminate) {
×
4105
        int32_t emptySlot = 0;
×
4106
        for (int32_t i = 0; i < limitThread; i++) {
×
4107
            int32_t idx = slot[i];
×
4108
            // check slot thread end
UNCOV
4109
            if(idx != EMPTY_SLOT) {
×
UNCOV
4110
                if (pthread_tryjoin_np(pids[idx], NULL) == EBUSY ) {
×
4111
                    // thread is running
4112
                    toolsMsleep(2000);
×
4113
                } else {
4114
                    // thread is end , set slot empty 
UNCOV
4115
                    infoPrint("slot[%d] finished tidx=%d. completed thread count=%d\n", i, slot[i], t);
×
UNCOV
4116
                    slot[i] = EMPTY_SLOT;
×
4117
                }
4118
            } 
4119

UNCOV
4120
            if (slot[i] == EMPTY_SLOT && t < nthreads) {
×
4121
                // slot is empty , set new thread to running
4122
                threadInfo *pThreadInfo = infos + t;
×
4123
                if (stbInfo->interlaceRows > 0) {
×
UNCOV
4124
                    pthread_create(pids + t, NULL, syncWriteInterlace,   pThreadInfo);
×
4125
                } else {
4126
                    pthread_create(pids + t, NULL, syncWriteProgressive, pThreadInfo);
×
4127
                }
4128
                
4129
                // save current and move next
4130
                slot[i] = t;
×
UNCOV
4131
                t++;
×
UNCOV
4132
                infoPrint("slot[%d] start new thread tidx=%d. \n", i, slot[i]);
×
4133
            }
4134

4135
            // check slot empty
4136
            if(slot[i] == EMPTY_SLOT) {
×
4137
                emptySlot++;
×
4138
            }
4139
        }
4140

4141
        // check all thread end
4142
        if(emptySlot == limitThread) {
×
4143
            debugPrint("all threads(%d) is run finished.\n", nthreads);
×
UNCOV
4144
            break;
×
4145
        } else {
4146
            debugPrint("current thread index=%d all thread=%d\n", t, nthreads);
×
4147
        }
4148
    }
4149

UNCOV
4150
    tmfree(slot);
×
4151

4152
    return 0;
×
4153
}
4154
#endif
4155

4156
// run
4157
int32_t runInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids) {
×
4158
    infoPrint("run insert thread. real nthread=%d\n", nthreads);
×
4159
    // create threads
UNCOV
4160
    int threadCnt = 0;
×
UNCOV
4161
    for (int i = 0; i < nthreads && !g_arguments->terminate; i++) {
×
4162
        threadInfo *pThreadInfo = infos + i;
×
UNCOV
4163
        if (stbInfo->interlaceRows > 0) {
×
4164
            pthread_create(pids + i, NULL, syncWriteInterlace,   pThreadInfo);
×
4165
        } else {
4166
            pthread_create(pids + i, NULL, syncWriteProgressive, pThreadInfo);
×
4167
        }
4168
        threadCnt ++;
×
4169
    }    
4170

4171
    // wait threads
4172
    for (int i = 0; i < threadCnt; i++) {
×
4173
        infoPrint("pthread_join %d ...\n", i);
×
4174
        pthread_join(pids[i], NULL);
×
4175
    }
4176

UNCOV
4177
    return 0;
×
4178
}
4179

4180

4181
// exit and free resource
UNCOV
4182
int32_t exitInsertThread(SDataBase* database, SSuperTable* stbInfo, int32_t nthreads, threadInfo *infos, pthread_t *pids, int64_t spend) {
×
4183

4184
    if (g_arguments->terminate)  toolsMsleep(100);
×
4185

4186
    BArray *  total_delay_list = benchArrayInit(1, sizeof(int64_t));
×
UNCOV
4187
    int64_t   totalDelay = 0;
×
4188
    int64_t   totalDelay1 = 0;
×
UNCOV
4189
    int64_t   totalDelay2 = 0;
×
UNCOV
4190
    int64_t   totalDelay3 = 0;
×
UNCOV
4191
    uint64_t  totalInsertRows = 0;
×
4192

4193
    // free threads resource
4194
    for (int i = 0; i < nthreads; i++) {
×
UNCOV
4195
        threadInfo *pThreadInfo = infos + i;
×
4196
        // free check sql
UNCOV
4197
        if (pThreadInfo->csql) {
×
UNCOV
4198
            tmfree(pThreadInfo->csql);
×
4199
            pThreadInfo->csql = NULL;
×
4200
        }
4201

4202
        // close conn
4203
        int protocol = stbInfo->lineProtocol;
×
4204
        switch (stbInfo->iface) {
×
4205
            case REST_IFACE:
×
4206
                if (g_arguments->terminate)
×
UNCOV
4207
                    toolsMsleep(100);
×
4208
                destroySockFd(pThreadInfo->sockfd);
×
UNCOV
4209
                if (stbInfo->interlaceRows > 0) {
×
4210
                    free_ds(&pThreadInfo->buffer);
×
4211
                } else {
UNCOV
4212
                    tmfree(pThreadInfo->buffer);
×
UNCOV
4213
                    pThreadInfo->buffer = NULL;
×
4214
                }
4215
                break;
×
4216
            case SML_REST_IFACE:
×
UNCOV
4217
                if (g_arguments->terminate)
×
UNCOV
4218
                    toolsMsleep(100);
×
4219
                tmfree(pThreadInfo->buffer);
×
4220
                // on-purpose no break here            
UNCOV
4221
            case SML_IFACE:
×
UNCOV
4222
                if (TSDB_SML_JSON_PROTOCOL != protocol
×
UNCOV
4223
                        && SML_JSON_TAOS_FORMAT != protocol) {
×
4224
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
×
UNCOV
4225
                        tmfree(pThreadInfo->sml_tags[t]);
×
4226
                    }
UNCOV
4227
                    for (int j = 0; j < g_arguments->reqPerReq; j++) {
×
4228
                        tmfree(pThreadInfo->lines[j]);
×
4229
                    }
4230
                    tmfree(pThreadInfo->sml_tags);
×
4231
                    pThreadInfo->sml_tags = NULL;
×
4232
                } else {
4233
                    for (int t = 0; t < pThreadInfo->ntables; t++) {
×
UNCOV
4234
                        tmfree(pThreadInfo->sml_tags_json_array[t]);
×
4235
                    }
4236
                    tmfree(pThreadInfo->sml_tags_json_array);
×
4237
                    pThreadInfo->sml_tags_json_array = NULL;
×
UNCOV
4238
                    if (pThreadInfo->sml_json_tags) {
×
4239
                        tools_cJSON_Delete(pThreadInfo->sml_json_tags);
×
4240
                        pThreadInfo->sml_json_tags = NULL;
×
4241
                    }
UNCOV
4242
                    if (pThreadInfo->json_array) {
×
UNCOV
4243
                        tools_cJSON_Delete(pThreadInfo->json_array);
×
UNCOV
4244
                        pThreadInfo->json_array = NULL;
×
4245
                    }
4246
                }
4247
                if (pThreadInfo->lines) {
×
4248
                    if ((0 == stbInfo->interlaceRows)
×
4249
                            && (TSDB_SML_JSON_PROTOCOL == protocol)) {
×
4250
                        tmfree(pThreadInfo->lines[0]);
×
4251
                        for (int t = 0; t < pThreadInfo->ntables; t++) {
×
4252
                            tmfree(pThreadInfo->sml_json_value_array[t]);
×
4253
                        }
4254
                        tmfree(pThreadInfo->sml_json_value_array);
×
4255
                    }
UNCOV
4256
                    tmfree(pThreadInfo->lines);
×
4257
                    pThreadInfo->lines = NULL;
×
4258
                }
4259
                break;
×
4260

4261
            case STMT_IFACE:
×
4262
                // close stmt
4263
                if(pThreadInfo->conn->stmt) {
×
4264
                    taos_stmt_close(pThreadInfo->conn->stmt);
×
4265
                    pThreadInfo->conn->stmt = NULL;
×
4266
                }
4267
            case STMT2_IFACE:
4268
                // close stmt2
4269
                if (pThreadInfo->conn->stmt2) {
×
4270
                    taos_stmt2_close(pThreadInfo->conn->stmt2);
×
UNCOV
4271
                    pThreadInfo->conn->stmt2 = NULL;
×
4272
                }
4273

UNCOV
4274
                tmfree(pThreadInfo->bind_ts);
×
4275
                tmfree(pThreadInfo->bind_ts_array);
×
4276
                tmfree(pThreadInfo->bindParams);
×
4277
                
4278
                // free tagsStmt
4279
                BArray *tags = pThreadInfo->tagsStmt;
×
4280
                if(tags) {
×
4281
                    // free child
4282
                    for (int k = 0; k < tags->size; k++) {
×
UNCOV
4283
                        Field * tag = benchArrayGet(tags, k);
×
4284
                        tmfree(tag->stmtData.data);
×
4285
                        tag->stmtData.data = NULL;
×
4286
                        tmfree(tag->stmtData.is_null);
×
UNCOV
4287
                        tag->stmtData.is_null = NULL;
×
UNCOV
4288
                        tmfree(tag->stmtData.lengths);
×
4289
                        tag->stmtData.lengths = NULL;
×
4290
                    }
4291
                    // free parent
4292
                    benchArrayDestroy(tags);
×
4293
                    pThreadInfo->tagsStmt = NULL;
×
4294
                }
4295

4296
                // free lengths
UNCOV
4297
                if(pThreadInfo->lengths) {
×
4298
                    for(int c = 0; c <= stbInfo->cols->size; c++) {
×
4299
                        tmfree(pThreadInfo->lengths[c]);
×
4300
                    }
4301
                    free(pThreadInfo->lengths);
×
UNCOV
4302
                    pThreadInfo->lengths = NULL;
×
4303
                }
UNCOV
4304
                break;
×
4305

4306
            case TAOSC_IFACE:
×
4307
                if (stbInfo->interlaceRows > 0) {
×
UNCOV
4308
                    free_ds(&pThreadInfo->buffer);
×
4309
                } else {
UNCOV
4310
                    tmfree(pThreadInfo->buffer);
×
4311
                    pThreadInfo->buffer = NULL;
×
4312
                }
4313
                break;
×
4314

UNCOV
4315
            default:
×
4316
                break;
×
4317
        }
4318
        totalInsertRows += pThreadInfo->totalInsertRows;
×
UNCOV
4319
        totalDelay += pThreadInfo->totalDelay;
×
UNCOV
4320
        totalDelay1 += pThreadInfo->totalDelay1;
×
4321
        totalDelay2 += pThreadInfo->totalDelay2;
×
4322
        totalDelay3 += pThreadInfo->totalDelay3;
×
UNCOV
4323
        benchArrayAddBatch(total_delay_list, pThreadInfo->delayList->pData,
×
4324
                pThreadInfo->delayList->size, true);
×
4325
        tmfree(pThreadInfo->delayList);
×
4326
        pThreadInfo->delayList = NULL;
×
4327
        //  free conn
4328
        if (pThreadInfo->conn) {
×
4329
            closeBenchConn(pThreadInfo->conn);
×
4330
            pThreadInfo->conn = NULL;
×
4331
        }
4332
    }
4333

4334
    // calculate result
4335
    qsort(total_delay_list->pData, total_delay_list->size,
×
4336
            total_delay_list->elemSize, compare);
4337

UNCOV
4338
    if (g_arguments->terminate)  toolsMsleep(100);
×
4339

4340
    tmfree(pids);
×
4341
    tmfree(infos);
×
4342

4343
    // print result
4344
    int ret = printTotalDelay(database, totalDelay, totalDelay1, totalDelay2, totalDelay3,
×
4345
                              total_delay_list, nthreads, totalInsertRows, spend);
4346
    benchArrayDestroy(total_delay_list);
×
UNCOV
4347
    if (g_fail || ret != 0) {
×
4348
        return -1;
×
4349
    }
4350
    return 0;
×
4351
}
4352

4353
static int startMultiThreadInsertData(SDataBase* database, SSuperTable* stbInfo) {
×
UNCOV
4354
    if ((stbInfo->iface == SML_IFACE || stbInfo->iface == SML_REST_IFACE)
×
4355
            && !stbInfo->use_metric) {
×
UNCOV
4356
        errorPrint("%s", "schemaless cannot work without stable\n");
×
4357
        return -1;
×
4358
    }
4359

4360
    // check argument valid
4361
    preProcessArgument(stbInfo);
×
4362

4363
    // ntable
4364
    int64_t ntables = obtainTableCount(database, stbInfo);
×
4365
    if (ntables == 0) {
×
4366
        errorPrint("insert table count is zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4367
        return -1;
×
4368
    }
4369

4370
    // assign table to thread
4371
    int32_t  nthreads  = g_arguments->nthreads;
×
4372
    int64_t  div       = 0;  // ntable / nthread  division
×
UNCOV
4373
    int64_t  mod       = 0;  // ntable % nthread
×
UNCOV
4374
    int64_t  spend     = 0;
×
4375

UNCOV
4376
    if (g_arguments->bind_vgroup) {
×
4377
        nthreads = assignTableToThread(database, stbInfo);
×
UNCOV
4378
        if(nthreads == 0) {
×
UNCOV
4379
            errorPrint("bind vgroup assign theads count is zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
4380
            return -1;
×
4381
        }
4382
    } else {
4383
        if(nthreads == 0) {
×
UNCOV
4384
            errorPrint("argument thread_count can not be zero. %s.%s\n", database->dbName, stbInfo->stbName);
×
UNCOV
4385
            return -1;
×
4386
        }
UNCOV
4387
        div = ntables / nthreads;
×
4388
        if (div < 1) {
×
4389
            nthreads = (int32_t)ntables;
×
4390
            div = 1;
×
4391
        }
4392
        mod = ntables % nthreads;
×
4393
    }
4394

4395

4396
    // init each thread information
4397
    pthread_t   *pids  = benchCalloc(1, nthreads * sizeof(pthread_t),  true);
×
4398
    threadInfo  *infos = benchCalloc(1, nthreads * sizeof(threadInfo), true);
×
4399

4400
    // init
UNCOV
4401
    int32_t ret = initInsertThread(database, stbInfo, nthreads, infos, div, mod);
×
UNCOV
4402
    if( ret != 0) {
×
4403
        errorPrint("init insert thread failed. %s.%s\n", database->dbName, stbInfo->stbName);
×
UNCOV
4404
        tmfree(pids);
×
UNCOV
4405
        tmfree(infos);
×
4406
        return ret;
×
4407
    }
4408

4409
    infoPrint("Estimate memory usage: %.2fMB\n", (double)g_memoryUsage / 1048576);
×
UNCOV
4410
    prompt(0);
×
4411

4412
   
4413
    // run
4414
    int64_t start = toolsGetTimestampUs();
×
4415
    if(g_arguments->bind_vgroup && g_arguments->nthreads < nthreads ) {
×
4416
        // need many batch execute all threads
4417
#ifdef LINUX        
4418
        ret = runInsertLimitThread(database, stbInfo, nthreads, g_arguments->nthreads, infos, pids);
×
4419
#else
4420
        ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
4421
#endif        
4422
    } else {
4423
        // only one batch execute all threads
UNCOV
4424
        ret = runInsertThread(database, stbInfo, nthreads, infos, pids);
×
4425
    }
4426

4427
    int64_t end = toolsGetTimestampUs();
×
UNCOV
4428
    if(end == start) {
×
4429
        spend = 1;
×
4430
    } else {
4431
        spend = end - start;
×
4432
    }
4433

4434
    // exit
UNCOV
4435
    ret = exitInsertThread(database, stbInfo, nthreads, infos, pids, spend);
×
UNCOV
4436
    return ret;
×
4437
}
4438

4439
static int getStbInsertedRows(char* dbName, char* stbName, TAOS* taos) {
×
4440
    int rows = 0;
×
4441
    char command[SHORT_1K_SQL_BUFF_LEN];
UNCOV
4442
    snprintf(command, SHORT_1K_SQL_BUFF_LEN, "SELECT COUNT(*) FROM %s.%s",
×
4443
             dbName, stbName);
4444
    TAOS_RES* res = taos_query(taos, command);
×
4445
    int code = taos_errno(res);
×
4446
    if (code != 0) {
×
4447
        printErrCmdCodeStr(command, code, res);
×
4448
        return -1;
×
4449
    }
UNCOV
4450
    TAOS_ROW row = taos_fetch_row(res);
×
4451
    if (row == NULL) {
×
4452
        rows = 0;
×
4453
    } else {
UNCOV
4454
        rows = (int)*(int64_t*)row[0];
×
4455
    }
4456
    taos_free_result(res);
×
4457
    return rows;
×
4458
}
4459

4460
static void create_tsma(TSMA* tsma, SBenchConn* conn, char* stbName) {
×
4461
    char command[SHORT_1K_SQL_BUFF_LEN];
UNCOV
4462
    int len = snprintf(command, SHORT_1K_SQL_BUFF_LEN,
×
4463
                       "CREATE sma INDEX %s ON %s function(%s) "
4464
                       "INTERVAL (%s) SLIDING (%s)",
4465
                       tsma->name, stbName, tsma->func,
4466
                       tsma->interval, tsma->sliding);
UNCOV
4467
    if (tsma->custom) {
×
UNCOV
4468
        snprintf(command + len, SHORT_1K_SQL_BUFF_LEN - len,
×
4469
                 " %s", tsma->custom);
4470
    }
4471
    int code = queryDbExecCall(conn, command);
×
UNCOV
4472
    if (code == 0) {
×
4473
        infoPrint("successfully create tsma with command <%s>\n", command);
×
4474
    }
UNCOV
4475
}
×
4476

4477
static void* create_tsmas(void* args) {
×
4478
    tsmaThreadInfo* pThreadInfo = (tsmaThreadInfo*) args;
×
UNCOV
4479
    int inserted_rows = 0;
×
UNCOV
4480
    SBenchConn* conn = initBenchConn();
×
4481
    if (NULL == conn) {
×
4482
        return NULL;
×
4483
    }
4484
    int finished = 0;
×
UNCOV
4485
    if (taos_select_db(conn->taos, pThreadInfo->dbName)) {
×
4486
        errorPrint("failed to use database (%s)\n", pThreadInfo->dbName);
×
4487
        closeBenchConn(conn);
×
4488
        return NULL;
×
4489
    }
4490
    while (finished < pThreadInfo->tsmas->size && inserted_rows >= 0) {
×
UNCOV
4491
        inserted_rows = (int)getStbInsertedRows(
×
4492
                pThreadInfo->dbName, pThreadInfo->stbName, conn->taos);
4493
        for (int i = 0; i < pThreadInfo->tsmas->size; i++) {
×
4494
            TSMA* tsma = benchArrayGet(pThreadInfo->tsmas, i);
×
UNCOV
4495
            if (!tsma->done &&  inserted_rows >= tsma->start_when_inserted) {
×
4496
                create_tsma(tsma, conn, pThreadInfo->stbName);
×
UNCOV
4497
                tsma->done = true;
×
4498
                finished++;
×
4499
                break;
×
4500
            }
4501
        }
4502
        toolsMsleep(10);
×
4503
    }
4504
    benchArrayDestroy(pThreadInfo->tsmas);
×
UNCOV
4505
    closeBenchConn(conn);
×
UNCOV
4506
    return NULL;
×
4507
}
4508

4509
void changeGlobalIface() {
×
4510
    if (g_arguments->databases->size == 1) {
×
4511
            SDataBase *db = benchArrayGet(g_arguments->databases, 0);
×
UNCOV
4512
            if (db && db->superTbls->size == 1) {
×
4513
                SSuperTable *stb = benchArrayGet(db->superTbls, 0);
×
UNCOV
4514
                if (stb) {
×
4515
                    if(g_arguments->iface != stb->iface) {
×
4516
                        infoPrint("only 1 db 1 super table, g_arguments->iface(%d) replace with stb->iface(%d) \n", g_arguments->iface, stb->iface);
×
UNCOV
4517
                        g_arguments->iface = stb->iface;
×
4518
                    }
4519
                }
4520
            }
4521
    }
4522
}
×
4523

4524
int insertTestProcess() {
×
UNCOV
4525
    prompt(0);
×
4526

4527
    encodeAuthBase64();
×
4528
    // if only one stable, global iface same with stable->iface
UNCOV
4529
    changeGlobalIface();
×
4530

4531
    // move from loop to here
UNCOV
4532
    if (isRest(g_arguments->iface)) {
×
4533
        if (0 != convertServAddr(g_arguments->iface,
×
4534
                                 false,
4535
                                 1)) {
4536
            return -1;
×
4537
        }
4538
    }    
4539

4540
    //loop create database 
4541
    for (int i = 0; i < g_arguments->databases->size; i++) {
×
4542
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
×
4543

UNCOV
4544
        if (database->drop && !(g_arguments->supplementInsert)) {
×
UNCOV
4545
            if (database->superTbls && database->superTbls->size > 0) {
×
UNCOV
4546
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, 0);
×
4547
                if (stbInfo && isRest(stbInfo->iface)) {
×
4548
                    if (0 != convertServAddr(stbInfo->iface,
×
UNCOV
4549
                                             stbInfo->tcpTransfer,
×
UNCOV
4550
                                             stbInfo->lineProtocol)) {
×
UNCOV
4551
                        return -1;
×
4552
                    }
4553
                }
4554
            }
4555

UNCOV
4556
            if (createDatabase(database)) {
×
UNCOV
4557
                errorPrint("failed to create database (%s)\n",
×
4558
                        database->dbName);
4559
                return -1;
×
4560
            }
UNCOV
4561
            succPrint("created database (%s)\n", database->dbName);
×
UNCOV
4562
        } else if(g_arguments->bind_vgroup) {
×
4563
            // database already exist, get vgroups from server
4564
            SBenchConn* conn = initBenchConn();
×
UNCOV
4565
            if (conn) {
×
4566
                int32_t vgroups = getVgroupsNative(conn, database);
×
4567
                if (vgroups <=0) {
×
UNCOV
4568
                    closeBenchConn(conn);
×
UNCOV
4569
                    errorPrint("Database %s's vgroups is zero , db exist case.\n", database->dbName);
×
UNCOV
4570
                    return -1;
×
4571
                }
4572
                closeBenchConn(conn);
×
UNCOV
4573
                succPrint("Database (%s) get vgroups num is %d from server.\n", database->dbName, vgroups);
×
4574
            }
4575
        }
4576
    }
4577

4578
    // create super table && fill child tables && prepareSampleData
4579
    for (int i = 0; i < g_arguments->databases->size; i++) {
×
4580
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
×
UNCOV
4581
        if (database->superTbls) {
×
UNCOV
4582
            for (int j = 0; j < database->superTbls->size; j++) {
×
UNCOV
4583
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
×
UNCOV
4584
                if (stbInfo->iface != SML_IFACE
×
UNCOV
4585
                        && stbInfo->iface != SML_REST_IFACE
×
UNCOV
4586
                        && !stbInfo->childTblExists) {
×
UNCOV
4587
                    int code = getSuperTableFromServer(database, stbInfo);
×
4588
                    if (code == TSDB_CODE_FAILED) {
×
4589
                        return -1;
×
4590
                    }
4591
                    
4592
                    // with create table if not exists, so if exist, can not report failed
4593
                    if (createSuperTable(database, stbInfo)) {
×
UNCOV
4594
                        return -1;
×
4595
                    }
4596
                    
4597
                }
4598
                // fill last ts from super table
4599
                if(stbInfo->autoFillback && stbInfo->childTblExists) {
×
UNCOV
4600
                    fillSTableLastTs(database, stbInfo);
×
4601
                }
4602

4603
                // calc now 
4604
                if(stbInfo->calcNow) {
×
4605
                    calcExprFromServer(database, stbInfo);
×
4606
                }
4607

4608
                // check fill child table count valid
UNCOV
4609
                if(fillChildTblName(database, stbInfo) <= 0) {
×
UNCOV
4610
                    infoPrint(" warning fill childs table count is zero, db:%s stb: %s \n", database->dbName, stbInfo->stbName);
×
4611
                }
UNCOV
4612
                if (0 != prepareSampleData(database, stbInfo)) {
×
UNCOV
4613
                    return -1;
×
4614
                }
4615

4616
                // early malloc buffer for auto create table
4617
                if((stbInfo->iface == STMT_IFACE || stbInfo->iface == STMT2_IFACE) && stbInfo->autoTblCreating) {
×
4618
                    prepareTagsStmt(stbInfo);
×
4619
                }
4620

4621
                // execute sqls
4622
                if (stbInfo->sqls) {
×
4623
                    char **sqls = stbInfo->sqls;
×
UNCOV
4624
                    while (*sqls) {
×
4625
                        queryDbExec(database, stbInfo, *sqls);
×
4626
                        sqls++;
×
4627
                    } 
4628
                }
4629
            }
4630
        }
4631
    }
4632

4633
    // tsma
UNCOV
4634
    if (g_arguments->taosc_version == 3) {
×
UNCOV
4635
        for (int i = 0; i < g_arguments->databases->size; i++) {
×
UNCOV
4636
            SDataBase* database = benchArrayGet(g_arguments->databases, i);
×
UNCOV
4637
            if (database->superTbls) {
×
UNCOV
4638
                for (int j = 0; (j < database->superTbls->size
×
4639
                        && !g_arguments->terminate); j++) {
×
4640
                    SSuperTable* stbInfo =
UNCOV
4641
                        benchArrayGet(database->superTbls, j);
×
UNCOV
4642
                    if (stbInfo->tsmas == NULL) {
×
UNCOV
4643
                        continue;
×
4644
                    }
UNCOV
4645
                    if (stbInfo->tsmas->size > 0) {
×
4646
                        tsmaThreadInfo* pThreadInfo =
4647
                            benchCalloc(1, sizeof(tsmaThreadInfo), true);
×
4648
                        pthread_t tsmas_pid = {0};
×
4649
                        pThreadInfo->dbName = database->dbName;
×
UNCOV
4650
                        pThreadInfo->stbName = stbInfo->stbName;
×
UNCOV
4651
                        pThreadInfo->tsmas = stbInfo->tsmas;
×
UNCOV
4652
                        pthread_create(&tsmas_pid, NULL,
×
4653
                                       create_tsmas, pThreadInfo);
4654
                    }
4655
                }
4656
            }
4657
        }
4658
    }
4659

UNCOV
4660
    if (createChildTables()) return -1;
×
4661

4662
    // create sub threads for inserting data
4663
    for (int i = 0; i < g_arguments->databases->size; i++) {
×
4664
        SDataBase * database = benchArrayGet(g_arguments->databases, i);
×
4665
        if (database->superTbls) {
×
UNCOV
4666
            for (uint64_t j = 0; j < database->superTbls->size; j++) {
×
UNCOV
4667
                SSuperTable * stbInfo = benchArrayGet(database->superTbls, j);
×
4668
                if (stbInfo->insertRows == 0) {
×
4669
                    continue;
×
4670
                }
4671
                prompt(stbInfo->non_stop);
×
4672
                if (startMultiThreadInsertData(database, stbInfo)) {
×
UNCOV
4673
                    return -1;
×
4674
                }
4675
            }
4676
        }
4677
    }
UNCOV
4678
    return 0;
×
4679
}
4680

4681
//
4682
//     ------- STMT 2 -----------
4683
//
4684

4685
static int32_t stmt2BindAndSubmit(
×
4686
        threadInfo *pThreadInfo,
4687
        SChildTable *childTbl,
4688
        int64_t *timestamp, uint64_t i, char *ttl, int32_t *pkCur, int32_t *pkCnt, int64_t *delay1,
4689
        int64_t *delay3, int64_t* startTs, int64_t* endTs, int32_t w) {
4690
    
4691
    // create bindV
UNCOV
4692
    int32_t count            = 1;
×
UNCOV
4693
    TAOS_STMT2_BINDV * bindv = createBindV(count, 0, 0);
×
UNCOV
4694
    TAOS_STMT2 *stmt2        = pThreadInfo->conn->stmt2;
×
UNCOV
4695
    SSuperTable *stbInfo     = pThreadInfo->stbInfo;
×
4696

4697
    //
4698
    // bind
4699
    //
4700

4701
    // count
UNCOV
4702
    bindv->count = 1;
×
4703
    // tbnames
UNCOV
4704
    bindv->tbnames[0] = childTbl->name;
×
4705
    // tags
4706
    //bindv->tags[0] = NULL; // Progrssive mode tag put on prepare sql, no need put here
4707
   
4708
    // bind_cols
UNCOV
4709
    uint32_t batch = (g_arguments->reqPerReq > stbInfo->insertRows - i) ? (stbInfo->insertRows - i) : g_arguments->reqPerReq;
×
UNCOV
4710
    int32_t n = 0;
×
UNCOV
4711
    int64_t pos = i % g_arguments->prepared_rand;
×
4712

4713
    // adjust batch about pos
UNCOV
4714
    if(g_arguments->prepared_rand - pos < batch ) {
×
UNCOV
4715
        debugPrint("prepared_rand(%" PRId64 ") is not a multiple of num_of_records_per(%d), the batch size can be modify. before=%d after=%d\n", 
×
4716
                    (int64_t)g_arguments->prepared_rand, (int32_t)g_arguments->reqPerReq, (int32_t)batch, (int32_t)(g_arguments->prepared_rand - pos));
UNCOV
4717
        batch = g_arguments->prepared_rand - pos;
×
4718
    } 
4719

UNCOV
4720
    if (batch == 0) {
×
UNCOV
4721
        infoPrint("batch size is zero. pos = %"PRId64"\n", pos);
×
UNCOV
4722
        return 0;
×
4723
    }
4724

UNCOV
4725
    uint32_t generated = bindVColsProgressive(bindv, 0, pThreadInfo, batch, *timestamp, pos, childTbl, pkCur, pkCnt, &n);
×
UNCOV
4726
    if(generated == 0) {
×
UNCOV
4727
        errorPrint( "get cols data bind information failed. table: %s\n", childTbl->name);
×
UNCOV
4728
        freeBindV(bindv);
×
UNCOV
4729
        return -1;
×
4730
    }
UNCOV
4731
    *timestamp += n * stbInfo->timestamp_step;
×
4732

UNCOV
4733
    if (g_arguments->debug_print) {
×
UNCOV
4734
        showBindV(bindv, stbInfo->tags, stbInfo->cols);
×
4735
    }
4736

4737
    // bind and submit
UNCOV
4738
    int32_t code = submitStmt2(pThreadInfo, bindv, delay1, delay3, startTs, endTs, &generated, w);
×
4739
    // free
UNCOV
4740
    freeBindV(bindv);
×
4741

UNCOV
4742
    if(code != 0) {
×
UNCOV
4743
        errorPrint( "failed submitStmt2() progressive mode, table: %s . engine error: %s\n", childTbl->name, taos_stmt2_error(stmt2));
×
UNCOV
4744
        return code;
×
4745
    } else {
UNCOV
4746
        debugPrint("succ submitStmt2 progressive mode. table=%s batch=%d pos=%" PRId64 " ts=%" PRId64 " generated=%d\n",
×
4747
                childTbl->name, batch, pos, *timestamp, generated);
UNCOV
4748
        return generated;
×
4749
    }
4750
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc