• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 12929200663

23 Jan 2025 12:30PM UTC coverage: 74.953% (-0.3%) from 75.249%
12929200663

Pull #839

github

web-flow
Merge 5b5d1f387 into 84c50c54f
Pull Request #839: FIX mixed query mode no need calc thread

388 of 548 new or added lines in 4 files covered. (70.8%)

243 existing lines in 9 files now uncovered.

12434 of 16589 relevant lines covered (74.95%)

321013.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.81
/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
// query and get result  record is true to total request
17
int selectAndGetResult(qThreadInfo *pThreadInfo, char *command, bool record) {
894✔
18
    int ret = 0;
894✔
19

20
    // user cancel
21
    if (g_arguments->terminate) {
894✔
22
        return -1;
×
23
    }
24

25
    // execute sql
26
    uint32_t threadID = pThreadInfo->threadID;
894✔
27
    char dbName[TSDB_DB_NAME_LEN] = {0};
894✔
28
    tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
894✔
29

30
    if (g_queryInfo.iface == REST_IFACE) {
894✔
UNCOV
31
        int retCode = postProceSql(command, g_queryInfo.dbName, 0, REST_IFACE,
×
UNCOV
32
                                   0, g_arguments->port, false,
×
UNCOV
33
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
×
UNCOV
34
        if (0 != retCode) {
×
NEW
35
            errorPrint("====restful return fail, threadID[%u]\n", threadID);
×
UNCOV
36
            ret = -1;
×
37
        }
38
    } else {
39
        // query
40
        TAOS *taos = pThreadInfo->conn->taos;
894✔
41
        int64_t rows  = 0;
894✔
42
        TAOS_RES *res = taos_query(taos, command);
894✔
43
        int code = taos_errno(res);
896✔
44
        if (res == NULL || code) {
896✔
45
            // failed query
46
            errorPrint("failed to execute sql:%s, "
10✔
47
                        "code: 0x%08x, reason:%s\n",
48
                        command, code, taos_errstr(res));
49
            ret = -1;
10✔
50
        } else {
51
            // succ query
52
            if (record)
886✔
53
                rows = fetchResult(res, pThreadInfo->filePath);
879✔
54
        }
55

56
        // free result
57
        if (res) {
896✔
58
            taos_free_result(res);
896✔
59
        }
60
        debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
896✔
61
    }
62

63
    // record count
64
    if (ret ==0) {
896✔
65
        // succ
66
        if (record) 
886✔
67
            pThreadInfo->nSucc ++;
879✔
68
    } else {
69
        // fail
70
        if (record)
10✔
71
            pThreadInfo->nFail ++;
10✔
72

73
        // continue option
74
        if (YES_IF_FAILED == g_arguments->continueIfFail) {
10✔
75
            ret = 0; // force continue
3✔
76
        }
77
    }
78

79
    return ret;
896✔
80
}
81

82
// interlligent sleep
83
void autoSleep(uint64_t delay) {
882✔
84
    if (g_queryInfo.specifiedQueryInfo.queryInterval &&
882✔
85
        delay < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) {
22✔
86
        toolsMsleep((int32_t)(
3✔
87
                g_queryInfo.specifiedQueryInfo.queryInterval*1000 - delay));  // ms
3✔
88
    }
89
}
882✔
90

91
// reset 
92
int32_t resetQueryCache(qThreadInfo* pThreadInfo) {    
7✔
93
    // execute sql 
94
    if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE", false)) {
7✔
NEW
95
        errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
96
        return -1;
×
97
    }
98
    // succ
99
    return 0;
7✔
100
}
101

102

103

104
//
105
//  ---------------------------------  second levle funtion for Thread -----------------------------------
106
//
107

108
// show rela qps
109
int64_t showRealQPS(qThreadInfo* pThreadInfo, int64_t lastPrintTime, int64_t startTs) {
881✔
110
    int64_t now = toolsGetTimestampMs();
881✔
111
    if (now - lastPrintTime > 10 * 1000) {
882✔
112
        // real total
NEW
113
        uint64_t totalQueried = pThreadInfo->nSucc;
×
NEW
114
        if(g_arguments->continueIfFail == YES_IF_FAILED) {
×
NEW
115
            totalQueried += pThreadInfo->nFail;
×
116
        }
NEW
117
        infoPrint(
×
118
            "thread[%d] has currently completed queries: %" PRIu64 ", QPS: %10.3f\n",
119
            pThreadInfo->threadID, totalQueried,
120
            (double)(totalQueried / ((now - startTs) / 1000.0)));
NEW
121
        return now;
×
122
    } else {
123
        return lastPrintTime;
882✔
124
    }    
125
}
126

127
// spec query mixed thread
128
static void *specQueryMixThread(void *sarg) {
6✔
129
    qThreadInfo *pThreadInfo = (qThreadInfo*)sarg;
6✔
130
#ifdef LINUX
131
    prctl(PR_SET_NAME, "specQueryMixThread");
6✔
132
#endif
133
    // use db
134
    if (g_queryInfo.dbName) {
6✔
135
        if (pThreadInfo->conn &&
6✔
136
            pThreadInfo->conn->taos &&
12✔
137
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
6✔
NEW
138
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
139
                return NULL;
×
140
        }
141
    }
142

143
    int64_t st = 0;
6✔
144
    int64_t et = 0;
6✔
145
    int64_t startTs = toolsGetTimestampMs();
6✔
146
    int64_t lastPrintTime = startTs;
6✔
147
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
6✔
148
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
6✔
149
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
12✔
150
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
6✔
151
        for (int j = 0; j < queryTimes; ++j) {
230✔
152
            // use cancel
153
            if(g_arguments->terminate) {
224✔
NEW
154
                infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
155
                break;
×
156
            }
157

158
            // reset cache
159
            if (g_queryInfo.reset_query_cache) {
224✔
NEW
160
                if (resetQueryCache(pThreadInfo)) {
×
NEW
161
                    errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
UNCOV
162
                    return NULL;
×
163
                }
164
            }
165

166
            // execute sql
167
            st = toolsGetTimestampUs();
224✔
168
            int ret = selectAndGetResult(pThreadInfo, sql->command, true);
224✔
169
            if (ret) {
224✔
NEW
170
                g_fail = true;
×
NEW
171
                errorPrint("failed call mix selectAndGetResult, i=%d j=%d", i, j);
×
NEW
172
                return NULL;
×
173
            }
174
            et = toolsGetTimestampUs();
224✔
175

176
            // sleep
177
            autoSleep(et - st);
224✔
178

179
            // delay
180
            if (ret == 0) {
224✔
181
                int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
224✔
182
                *delay = et - st;
224✔
183
                debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, *delay);
224✔
184

185
                pThreadInfo->total_delay += *delay;
224✔
186
                if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
224✔
NEW
187
                    tmfree(delay);
×
188
                }
189
            }
190

191
            // real show
192
            lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
224✔
193
        }
194
    }
195

196
    return NULL;
6✔
197
}
198

199
// spec query thread
200
static void *specQueryThread(void *sarg) {
22✔
201
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
22✔
202
#ifdef LINUX
203
    prctl(PR_SET_NAME, "specQueryThread");
22✔
204
#endif
205
    uint64_t st = 0;
22✔
206
    uint64_t et = 0;
22✔
207
    int32_t  index = 0;
22✔
208

209
    // use db
210
    if (g_queryInfo.dbName) {
22✔
211
        if (pThreadInfo->conn &&
22✔
212
            pThreadInfo->conn->taos &&
44✔
213
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
22✔
214
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
215
                return NULL;
×
216
        }
217
    }
218

219
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
22✔
220
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
22✔
221

222
    uint64_t  startTs       = toolsGetTimestampMs();
22✔
223
    uint64_t  lastPrintTime = startTs;
22✔
224

225
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, pThreadInfo->querySeq);
22✔
226

227
    if (sql->result[0] != '\0') {
22✔
228
        snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
10✔
229
                sql->result, pThreadInfo->threadID);
10✔
230
    }
231

232
    while (index < queryTimes) {
680✔
233
        // use cancel
234
        if(g_arguments->terminate) {
656✔
NEW
235
            infoPrint("thread[%d] user cancel , so exit testing.\n", pThreadInfo->threadID);
×
NEW
236
            break;
×
237
        }
238

239
        // reset cache
240
        if (g_queryInfo.reset_query_cache) {
656✔
241
            if (resetQueryCache(pThreadInfo)) {
6✔
NEW
242
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
UNCOV
243
                return NULL;
×
244
            }
245
        }
246

247
        // execute sql
248
        st = toolsGetTimestampUs();
656✔
249
        int ret = selectAndGetResult(pThreadInfo, sql->command, true);
656✔
250
        if (ret) {
656✔
UNCOV
251
            g_fail = true;
×
NEW
252
            errorPrint("failed call spec selectAndGetResult, index=%d\n", index);
×
NEW
253
            break;
×
254
        }
255
        et = toolsGetTimestampUs();
656✔
256

257
        // sleep
258
        autoSleep(et - st);
658✔
259

260
        uint64_t delay = et - st;
657✔
261
        debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
657✔
262

263
        if (ret == 0) {
657✔
264
            // only succ add delay list
265
            benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
658✔
266
            pThreadInfo->total_delay += delay;
657✔
267
        }
268
        index++;
656✔
269

270
        // real show
271
        lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
656✔
272
    }
273

274
    return NULL;
24✔
275
}
276

277
// super table query thread
278
static void *stbQueryThread(void *sarg) {
7✔
279
    char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
7✔
280
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
7✔
281
#ifdef LINUX
282
    prctl(PR_SET_NAME, "stbQueryThread");
7✔
283
#endif
284

285
    uint64_t st = 0;
7✔
286
    uint64_t et = 0;
7✔
287

288
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
7✔
289
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(uint64_t));
7✔
290
    
291
    uint64_t startTs = toolsGetTimestampMs();
7✔
292
    uint64_t lastPrintTime = startTs;
7✔
293
    while (queryTimes--) {
7✔
294
        // use cancel
295
        if(g_arguments->terminate) {
7✔
NEW
296
            infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
297
            break;
×
298
        }
299

300
        // reset cache
301
        if (g_queryInfo.reset_query_cache) {
7✔
302
            if (resetQueryCache(pThreadInfo)) {
1✔
NEW
303
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
304
                return NULL;
×
305
            }
306
        }
307

308
        // execute
309
        st = toolsGetTimestampMs();
7✔
310
        // for each table
311
        for (int i = (int)pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
7✔
312
            // use cancel
313
            if(g_arguments->terminate) {
7✔
NEW
314
                infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
315
                break;
×
316
            }
317

318
            // for each sql
319
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
7✔
320
                memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN);
7✔
321
                // use cancel
322
                if(g_arguments->terminate) {
7✔
NEW
323
                    infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
324
                    break;
×
325
                }
326
                
327
                // get real child name sql
328
                if (replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i)) {
7✔
329
                    // fault
NEW
330
                    tmfree(sqlstr);
×
331
                    return NULL;
7✔
332
                }
333

334
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
7✔
335
                    snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
7✔
336
                            g_queryInfo.superQueryInfo.result[j],
7✔
337
                            pThreadInfo->threadID);
338
                }
339

340
                // execute sql
341
                uint64_t s = toolsGetTimestampUs();
7✔
342
                int ret = selectAndGetResult(pThreadInfo, sqlstr, true);
7✔
343
                if (ret) {
7✔
344
                    // found error
345
                    errorPrint("failed call stb selectAndGetResult, i=%d j=%d\n", i, j);
7✔
346
                    g_fail = true;
7✔
347
                    tmfree(sqlstr);
7✔
348
                    return NULL;
7✔
349
                }
NEW
350
                uint64_t delay = toolsGetTimestampUs() - s;
×
NEW
351
                debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
×
NEW
352
                if (ret == 0) {
×
353
                    // only succ add delay list
NEW
354
                    benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
×
NEW
355
                    pThreadInfo->total_delay += delay;
×
356
                }
357

358
                // show real QPS
NEW
359
                lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
×
360
            }
361
        }
UNCOV
362
        et = toolsGetTimestampMs();
×
363

364
        // sleep
NEW
365
        autoSleep(et - st);
×
366
    }
UNCOV
367
    tmfree(sqlstr);
×
368

UNCOV
369
    return NULL;
×
370
}
371

372
//
373
// ---------------------------------  firse level function ------------------------------
374
//
375

376
void totalChildQuery(qThreadInfo* infos, int threadCnt, int64_t spend) {
6✔
377
    // valid check
378
    if (infos == NULL || threadCnt == 0) {
6✔
NEW
379
        return ;
×
380
    }
381
    
382
    // statistic
383
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
6✔
384
    double total_delays = 0;
6✔
385

386
    // clear
387
    for (int i = 0; i < threadCnt; ++i) {
19✔
388
        qThreadInfo * pThreadInfo = infos + i;
13✔
389
        if(pThreadInfo->query_delay_list == NULL) {
13✔
NEW
390
            continue;;
×
391
        }
392
        
393
        // append delay
394
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
13✔
395
                pThreadInfo->query_delay_list->size, false);
13✔
396
        total_delays += pThreadInfo->total_delay;
13✔
397

398
        // free delay
399
        benchArrayDestroy(pThreadInfo->query_delay_list);
13✔
400
        pThreadInfo->query_delay_list = NULL;
13✔
401

402
    }
403

404
    // succ is zero
405
    if (delay_list->size == 0) {
6✔
406
        errorPrint("%s", "succ queries count is zero.\n");
3✔
407
        benchArrayDestroy(delay_list);
3✔
408
        return ;
3✔
409
    }
410

411

412
    // sort
413
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
3✔
414

415
    // show delay min max
416
    if (delay_list->size) {
3✔
417
        infoPrint(
3✔
418
                "spend %.6fs using "
419
                "%d threads complete query %d times,  "
420
                "min delay: %.6fs, "
421
                "avg delay: %.6fs, "
422
                "p90: %.6fs, "
423
                "p95: %.6fs, "
424
                "p99: %.6fs, "
425
                "max: %.6fs\n",
426
                spend/1E6,
427
                threadCnt, (int)delay_list->size,
428
                *(int64_t *)(benchArrayGet(delay_list, 0))/1E6,
429
                (double)total_delays/delay_list->size/1E6,
430
                *(int64_t *)(benchArrayGet(delay_list,
431
                                    (int32_t)(delay_list->size * 0.9)))/1E6,
432
                *(int64_t *)(benchArrayGet(delay_list,
433
                                    (int32_t)(delay_list->size * 0.95)))/1E6,
434
                *(int64_t *)(benchArrayGet(delay_list,
435
                                    (int32_t)(delay_list->size * 0.99)))/1E6,
436
                *(int64_t *)(benchArrayGet(delay_list,
437
                                    (int32_t)(delay_list->size - 1)))/1E6);
438
    } else {
NEW
439
        errorPrint("%s() LN%d, delay_list size: %"PRId64"\n",
×
440
                   __func__, __LINE__, (int64_t)delay_list->size);
441
    }
442
    benchArrayDestroy(delay_list);
3✔
443
}
444

445
//
446
// super table query
447
//
448
static int stbQuery(uint16_t iface, char* dbName) {
3✔
449
    int ret = -1;
3✔
450
    pthread_t * pidsOfSub   = NULL;
3✔
451
    qThreadInfo *threadInfos = NULL;
3✔
452
    g_queryInfo.superQueryInfo.totalQueried = 0;
3✔
453
    g_queryInfo.superQueryInfo.totalFail    = 0;
3✔
454

455
    // check
456
    if ((g_queryInfo.superQueryInfo.sqlCount == 0)
3✔
457
        || (g_queryInfo.superQueryInfo.threadCnt == 0)) {
3✔
UNCOV
458
        return 0;
×
459
    }
460

461
    // malloc 
462
    pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
3✔
463
                            *sizeof(pthread_t),
464
                            false);
465
    threadInfos = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
3✔
466
                                *sizeof(qThreadInfo), false);
467

468
    int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
3✔
469
    int nConcurrent = g_queryInfo.superQueryInfo.threadCnt;
3✔
470

471
    int64_t a = ntables / nConcurrent;
3✔
472
    if (a < 1) {
3✔
NEW
473
        nConcurrent = (int)ntables;
×
NEW
474
        a = 1;
×
475
    }
476

477
    int64_t b = 0;
3✔
478
    if (nConcurrent != 0) {
3✔
479
        b = ntables % nConcurrent;
3✔
480
    }
481

482
    uint64_t tableFrom = 0;
3✔
483
    int threadCnt = 0;
3✔
484
    for (int i = 0; i < nConcurrent; i++) {
10✔
485
        qThreadInfo *pThreadInfo = threadInfos + i;
7✔
486
        pThreadInfo->threadID = i;
7✔
487
        pThreadInfo->start_table_from = tableFrom;
7✔
488
        pThreadInfo->ntables = i < b ? a + 1 : a;
7✔
489
        pThreadInfo->end_table_to =
7✔
490
                i < b ? tableFrom + a : tableFrom + a - 1;
7✔
491
        tableFrom = pThreadInfo->end_table_to + 1;
7✔
492
        // create conn
493
        if (initQueryConn(pThreadInfo, iface)){
7✔
NEW
494
            break;
×
495
        }
496
        int code = pthread_create(pidsOfSub + i, NULL, stbQueryThread, pThreadInfo);
7✔
497
        if (code != 0) {
7✔
NEW
498
            errorPrint("failed stbQueryThread create. error code =%d \n", code);
×
NEW
499
            break;
×
500
        }
501
        threadCnt ++;
7✔
502
    }
503

504
    bool needExit = false;
3✔
505
    // if failed, set termainte flag true like ctrl+c exit
506
    if (threadCnt != nConcurrent  ) {
3✔
NEW
507
        needExit = true;
×
NEW
508
        g_arguments->terminate = true;
×
NEW
509
        goto OVER;
×
510
    }
511

512
    // reset total
513
    g_queryInfo.superQueryInfo.totalQueried = 0;
3✔
514
    g_queryInfo.superQueryInfo.totalFail    = 0;
3✔
515

516
    // real thread count
517
    g_queryInfo.superQueryInfo.threadCnt = threadCnt;
3✔
518
    int64_t start = toolsGetTimestampUs();
3✔
519

520
    for (int i = 0; i < threadCnt; i++) {
10✔
521
        pthread_join(pidsOfSub[i], NULL);
7✔
522
        qThreadInfo *pThreadInfo = threadInfos + i;
7✔
523
        // add succ
524
        g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nSucc;
7✔
525
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
7✔
526
            // "yes" need add fail cnt
NEW
527
            g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nFail;
×
NEW
528
            g_queryInfo.superQueryInfo.totalFail    += pThreadInfo->nFail;
×
529
        }
530

531
        // close conn
532
        closeQueryConn(pThreadInfo, iface);
7✔
533
    }
534
    int64_t end = toolsGetTimestampUs();
3✔
535

536
    if (needExit) {
3✔
NEW
537
        goto OVER;
×
538
    }
539

540
    // total show
541
    totalChildQuery(threadInfos, threadCnt, end - start);
3✔
542

543
    ret = 0;
3✔
544

545
OVER:
3✔
546
    tmfree((char *)pidsOfSub);
3✔
547
    tmfree((char *)threadInfos);
3✔
548

549
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
26✔
550
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
23✔
551
    }
552
    tmfree(g_queryInfo.superQueryInfo.childTblName);
3✔
553
    return ret;
3✔
554
}
555

556
//
557
// specQuery
558
//
559
static int specQuery(uint16_t iface, char* dbName) {
6✔
560
    int ret = -1;
6✔
561
    pthread_t    *pids = NULL;
6✔
562
    qThreadInfo *infos = NULL;
6✔
563
    int    nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
6✔
564
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
6✔
565
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
6✔
566
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
6✔
567

568
    // check invaid
569
    if(nSqlCount == 0 || nConcurrent == 0 ) {
6✔
UNCOV
570
        if(nSqlCount == 0)
×
UNCOV
571
           warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
×
UNCOV
572
        if(nConcurrent == 0)
×
NEW
573
           warnPrint("nConcurrent is %d , specified_table_query->nConcurrent is zero. \n", nConcurrent);
×
UNCOV
574
        return 0;
×
575
    }
576

577
    // malloc threads memory
578
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
6✔
579
    infos = benchCalloc(1, nConcurrent * sizeof(qThreadInfo), false);
6✔
580

581
    for (uint64_t i = 0; i < nSqlCount; i++) {
18✔
582
        if( g_arguments->terminate ) {
12✔
NEW
583
            break;
×
584
        }
585

586
        // reset
587
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
12✔
588
        memset(infos, 0, nConcurrent * sizeof(qThreadInfo));
12✔
589

590
        // get execute sql
591
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
12✔
592

593
        // create threads
594
        int threadCnt = 0;
12✔
595
        for (int j = 0; j < nConcurrent; j++) {
34✔
596
           qThreadInfo *pThreadInfo = infos + j;
22✔
597
           pThreadInfo->threadID = i * nConcurrent + j;
22✔
598
           pThreadInfo->querySeq = i;
22✔
599

600
           // create conn
601
           if (initQueryConn(pThreadInfo, iface)) {
22✔
NEW
602
               break;
×
603
           }
604

605
           int code = pthread_create(pids + j, NULL, specQueryThread, pThreadInfo);
22✔
606
           if (code != 0) {
22✔
NEW
607
               errorPrint("failed specQueryThread create. error code =%d \n", code);
×
NEW
608
               break;
×
609
           }
610
           threadCnt++;
22✔
611
        }
612

613
        bool needExit = false;
12✔
614
        // if failed, set termainte flag true like ctrl+c exit
615
        if (threadCnt != nConcurrent  ) {
12✔
NEW
616
            needExit = true;
×
UNCOV
617
            g_arguments->terminate = true;
×
618
        }
619

620
        int64_t start = toolsGetTimestampUs();
12✔
621
        // wait threads execute finished one by one
622
        for (int j = 0; j < threadCnt ; j++) {
34✔
623
           pthread_join(pids[j], NULL);
22✔
624
           qThreadInfo *pThreadInfo = infos + j;
22✔
625
           closeQueryConn(pThreadInfo, iface);
22✔
626

627
           // need exit in loop
628
           if (needExit) {
22✔
629
                // free BArray
NEW
630
                benchArrayDestroy(pThreadInfo->query_delay_list);
×
UNCOV
631
                pThreadInfo->query_delay_list = NULL;
×
632
           }
633
        }
634
        int64_t spend = toolsGetTimestampUs() - start;
12✔
635
        if(spend == 0) {
12✔
636
            // avoid xx/spend expr throw error
NEW
637
            spend = 1;
×
638
        }
639

640
        // create 
641
        if (needExit) {
12✔
NEW
642
            errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
NEW
643
            goto OVER;
×
644
        }
645

646
        //
647
        // show QPS and P90 ...
648
        //
649
        uint64_t n = 0;
12✔
650
        double  total_delays = 0.0;
12✔
651
        uint64_t totalQueried = 0;
12✔
652
        uint64_t totalFail    = 0;
12✔
653
        for (int j = 0; j < threadCnt; j++) {
34✔
654
           qThreadInfo *pThreadInfo = infos + j;
22✔
655
           if(pThreadInfo->query_delay_list == NULL) {
22✔
NEW
656
                continue;;
×
657
           }
658
           
659
           // total one sql
660
           for (uint64_t k = 0; k < pThreadInfo->query_delay_list->size; k++) {
680✔
661
                int64_t * delay = benchArrayGet(pThreadInfo->query_delay_list, k);
658✔
662
                sql->delay_list[n++] = *delay;
658✔
663
                total_delays += *delay;
658✔
664
           }
665

666
           // total queries
667
           totalQueried += pThreadInfo->nSucc;
22✔
668
           if (g_arguments->continueIfFail == YES_IF_FAILED) {
22✔
669
                totalQueried += pThreadInfo->nFail;
7✔
670
                totalFail    += pThreadInfo->nFail;
7✔
671
           }
672

673
           // free BArray query_delay_list
674
           benchArrayDestroy(pThreadInfo->query_delay_list);
22✔
675
           pThreadInfo->query_delay_list = NULL;
22✔
676
        }
677

678
        // appand current sql
679
        g_queryInfo.specifiedQueryInfo.totalQueried += totalQueried;
12✔
680
        g_queryInfo.specifiedQueryInfo.totalFail    += totalFail;
12✔
681

682
        // succ is zero
683
        if(totalQueried == 0 || n == 0) {
12✔
NEW
684
            errorPrint("%s", "succ queries count is zero.\n");
×
NEW
685
            goto OVER;
×
686
        }
687

688
        qsort(sql->delay_list, n, sizeof(uint64_t), compare);
12✔
689
        int32_t bufLen = strlen(sql->command) + 512;
12✔
690
        char * buf = benchCalloc(bufLen, sizeof(char), false);
12✔
691
        snprintf(buf , bufLen, "complete query with %d threads and %" PRIu64 " "
12✔
692
                             "sql %"PRIu64" spend %.6fs QPS: %.3f "
693
                             "query delay "
694
                             "avg: %.6fs "
695
                             "min: %.6fs "
696
                             "max: %.6fs "
697
                             "p90: %.6fs "
698
                             "p95: %.6fs "
699
                             "p99: %.6fs "
700
                             "SQL command: %s \n",
701
                             threadCnt, totalQueried,
702
                             i + 1, spend/1E6, totalQueried / (spend/1E6),
12✔
703
                             total_delays/n/1E6,           /* avg */
12✔
704
                             sql->delay_list[0] / 1E6,     /* min */
12✔
705
                             sql->delay_list[n - 1] / 1E6, /* max */
12✔
706
                             /*  p90 */
707
                             sql->delay_list[(uint64_t)(n * 0.90)] / 1E6,
12✔
708
                             /*  p95 */
709
                             sql->delay_list[(uint64_t)(n * 0.95)] / 1E6,
12✔
710
                             /*  p99 */
711
                             sql->delay_list[(uint64_t)(n * 0.99)] / 1E6, 
12✔
712
                             sql->command);
713

714
        infoPrintNoTimestamp("%s", buf);
12✔
715
        infoPrintNoTimestampToFile("%s", buf);
12✔
716
        tmfree(buf);
12✔
717
    }
718
    ret = 0;
6✔
719

720
OVER:
6✔
721
    tmfree((char *)pids);
6✔
722
    tmfree((char *)infos);
6✔
723

724
    // free specialQueryInfo
725
    freeSpecialQueryInfo();
6✔
726
    return ret;
6✔
727
}
728

729
//
730
// specQueryMix
731
//
732
static int specQueryMix(uint16_t iface, char* dbName) {
3✔
733
    // init
734
    int ret            = -1;
3✔
735
    int nConcurrent    = g_queryInfo.specifiedQueryInfo.concurrent;
3✔
736
    pthread_t * pids   = benchCalloc(nConcurrent, sizeof(pthread_t), true);
3✔
737
    qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true);
3✔
738

739
    // concurent calc
740
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
3✔
741
    int start_sql     = 0;
3✔
742
    int a             = total_sql_num / nConcurrent;
3✔
743
    if (a < 1) {
3✔
744
        warnPrint("sqls num:%d < concurent:%d, so set concurrent to %d\n", total_sql_num, nConcurrent, nConcurrent);
2✔
745
        nConcurrent = total_sql_num;
2✔
746
        a = 1;
2✔
747
    }
748
    int b = 0;
3✔
749
    if (nConcurrent != 0) {
3✔
750
        b = total_sql_num % nConcurrent;
3✔
751
    }
752

753
    //
754
    // running
755
    //
756
    int threadCnt = 0;
3✔
757
    for (int i = 0; i < nConcurrent; ++i) {
9✔
758
        qThreadInfo *pThreadInfo = infos + i;
6✔
759
        pThreadInfo->threadID    = i;
6✔
760
        pThreadInfo->start_sql   = start_sql;
6✔
761
        pThreadInfo->end_sql     = i < b ? start_sql + a : start_sql + a - 1;
6✔
762
        start_sql = pThreadInfo->end_sql + 1;
6✔
763
        pThreadInfo->total_delay = 0;
6✔
764

765
        // create conn
766
        if (initQueryConn(pThreadInfo, iface)){
6✔
NEW
767
            break;
×
768
        }
769
        // main run
770
        int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo);
6✔
771
        if (code != 0) {
6✔
NEW
772
            errorPrint("failed specQueryMixThread create. error code =%d \n", code);
×
NEW
773
            break;
×
774
        }
775
        
776
        threadCnt ++;
6✔
777
    }
778

779
    bool needExit = false;
3✔
780
    // if failed, set termainte flag true like ctrl+c exit
781
    if (threadCnt != nConcurrent) {
3✔
NEW
782
        needExit = true;
×
NEW
783
        g_arguments->terminate = true;
×
784
    }
785

786
    // reset total
787
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
3✔
788
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
3✔
789

790
    int64_t start = toolsGetTimestampUs();
3✔
791
    for (int i = 0; i < threadCnt; ++i) {
9✔
792
        pthread_join(pids[i], NULL);
6✔
793
        qThreadInfo *pThreadInfo = infos + i;
6✔
794
        closeQueryConn(pThreadInfo, iface);
6✔
795

796
        // total queries
797
        g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc;
6✔
798
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
6✔
799
            // yes need add failed count
800
            g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail;
2✔
801
            g_queryInfo.specifiedQueryInfo.totalFail    += pThreadInfo->nFail;
2✔
802
        }
803

804
        // destory
805
        if (needExit) {
6✔
NEW
806
            benchArrayDestroy(pThreadInfo->query_delay_list);
×
NEW
807
            pThreadInfo->query_delay_list = NULL;
×
808
        }
809
    }
810
    int64_t end = toolsGetTimestampUs();
3✔
811

812
    // create 
813
    if (needExit) {
3✔
NEW
814
        errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
NEW
815
        goto OVER;
×
816
    }
817

818
    // statistic
819
    totalChildQuery(infos, threadCnt, end - start);
3✔
820
    ret = 0;
3✔
821

822
OVER:
3✔
823
    tmfree(pids);
3✔
824
    tmfree(infos);
3✔
825

826
    // free sqls
827
    freeSpecialQueryInfo();
3✔
828

829
    return ret;
3✔
830
}
831

832
// total query for end 
833
void totalQuery(int64_t spends) {
12✔
834
    // total QPS
835
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
12✔
836
        + g_queryInfo.superQueryInfo.totalQueried;
12✔
837

838
    // error rate
839
    char errRate[128] = "";
12✔
840
    if(g_arguments->continueIfFail == YES_IF_FAILED) {
12✔
841
        uint64_t totalFail = g_queryInfo.specifiedQueryInfo.totalFail + g_queryInfo.superQueryInfo.totalFail;
3✔
842
        if (totalQueried > 0) {
3✔
843
            snprintf(errRate, sizeof(errRate), " ,error %" PRIu64 " (rate:%.3f%%)", totalFail, ((float)totalFail * 100)/totalQueried);
3✔
844
        }
845
    }
846

847
    // show
848
    double  tInS = (double)spends / 1000;
12✔
849
    char buf[512] = "";
12✔
850
    snprintf(buf, sizeof(buf),
12✔
851
                "Spend %.4f second completed total queries: %" PRIu64
852
                ", the QPS of all threads: %10.3f%s\n\n",
853
                tInS, totalQueried, (double)totalQueried / tInS, errRate);
12✔
854
    infoPrint("%s", buf);
12✔
855
    infoPrintToFile("%s", buf);
12✔
856
}
12✔
857

858
int queryTestProcess() {
12✔
859
    prompt(0);
12✔
860

861
    if (REST_IFACE == g_queryInfo.iface) {
12✔
UNCOV
862
        encodeAuthBase64();
×
863
    }
864

865
    // kill sql for executing seconds over "kill_slow_query_threshold"
866
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
12✔
NEW
867
        int32_t ret = killSlowQuery();
×
NEW
868
        if (ret != 0) {
×
NEW
869
            return ret;
×
870
        }
871
    }
872

873
    // covert addr
874
    if (g_queryInfo.iface == REST_IFACE) {
12✔
UNCOV
875
        if (convertHostToServAddr(g_arguments->host,
×
UNCOV
876
                    g_arguments->port + TSDB_PORT_HTTP,
×
UNCOV
877
                    &(g_arguments->serv_addr)) != 0) {
×
878
            errorPrint("%s", "convert host to server address\n");
×
879
            return -1;
×
880
        }
881
    }
882

883
    // fetch child name if super table
884
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
12✔
885
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
3✔
886
        int32_t ret = fetchChildTableName(g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
3✔
887
        if (ret != 0) {
3✔
NEW
888
            errorPrint("fetchChildTableName dbName=%s stb=%s failed.", g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
×
UNCOV
889
            return -1;
×
890
        }
891
    }
892

893
    // 
894
    // start running
895
    //
896

897
    
898
    uint64_t startTs = toolsGetTimestampMs();
12✔
899
    if(g_queryInfo.specifiedQueryInfo.sqls && g_queryInfo.specifiedQueryInfo.sqls->size > 0) {
12✔
900
        // specified table
901
        if (g_queryInfo.specifiedQueryInfo.mixed_query) {
9✔
902
            // mixed
903
            if (specQueryMix(g_queryInfo.iface, g_queryInfo.dbName)) {
3✔
NEW
904
                return -1;
×
905
            }
906
        } else {
907
            // no mixied
908
            if (specQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
6✔
UNCOV
909
                return -1;
×
910
            }
911
        }
912
    } else if(g_queryInfo.superQueryInfo.sqlCount > 0) {
3✔
913
        // super table
914
        if (stbQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
3✔
UNCOV
915
            return -1;
×
916
        }
917
    } else {
918
        // nothing
NEW
919
        errorPrint("%s\n", "Both 'specified_table_query' and 'super_table_query' sqls is empty.");
×
UNCOV
920
        return -1;
×
921
    }
922

923
    // total 
924
    totalQuery(toolsGetTimestampMs() - startTs); 
12✔
925
    return 0;
12✔
926
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc