• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 12879645482

21 Jan 2025 03:35AM UTC coverage: 75.116% (-0.1%) from 75.249%
12879645482

Pull #839

github

web-flow
Merge abad4de95 into cde537a70
Pull Request #839: FIX mixed query mode no need calc thread

396 of 546 new or added lines in 4 files covered. (72.53%)

34 existing lines in 7 files now uncovered.

12458 of 16585 relevant lines covered (75.12%)

328703.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.75
/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
// query and get result  other true is no test sql
17
int selectAndGetResult(qThreadInfo *pThreadInfo, char *command, bool other) {
6,320✔
18
    int ret = 0;
6,320✔
19

20
    // user cancel
21
    if (g_arguments->terminate) {
6,320✔
22
        return -1;
×
23
    }
24

25
    // execute sql
26
    uint32_t threadID = pThreadInfo->threadID;
6,320✔
27
    char dbName[TSDB_DB_NAME_LEN] = {0};
6,320✔
28
    tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
6,320✔
29

30
    if (g_queryInfo.iface == REST_IFACE) {
6,320✔
31
        int retCode = postProceSql(command, g_queryInfo.dbName, 0, REST_IFACE,
36✔
32
                                   0, g_arguments->port, false,
36✔
33
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
36✔
34
        if (0 != retCode) {
36✔
NEW
35
            errorPrint("====restful return fail, threadID[%u]\n", threadID);
×
UNCOV
36
            ret = -1;
×
37
        }
38
    } else {
39
        // query
40
        TAOS *taos = pThreadInfo->conn->taos;
6,284✔
41
        int64_t rows  = 0;
6,284✔
42
        TAOS_RES *res = taos_query(taos, command);
6,284✔
43
        int code = taos_errno(res);
6,284✔
44
        if (res == NULL || code) {
6,284✔
45
            // failed query
46
            errorPrint("failed to execute sql:%s, "
10✔
47
                        "code: 0x%08x, reason:%s\n",
48
                        command, code, taos_errstr(res));
49
            ret = -1;
9✔
50
        } else {
51
            // succ query
52
            if (!other)
6,274✔
53
                rows = fetchResult(res, pThreadInfo->filePath);
6,268✔
54
        }
55

56
        // free result
57
        if (res) {
6,283✔
58
            taos_free_result(res);
6,284✔
59
        }
60
        debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
6,284✔
61
    }
62

63
    // record count
64
    if (ret ==0) {
6,320✔
65
        // succ
66
        if (!other) 
6,311✔
67
            pThreadInfo->nSucc ++;
6,303✔
68
    } else {
69
        // fail
70
        if (!other)
9✔
71
            pThreadInfo->nFail ++;
9✔
72

73
        // continue option
74
        if (YES_IF_FAILED == g_arguments->continueIfFail) {
9✔
75
            ret = 0; // force continue
3✔
76
        }
77
    }
78

79
    return ret;
6,320✔
80
}
81

82
// interlligent sleep
83
void autoSleep(uint64_t st, uint64_t et) {
323✔
84
    if (g_queryInfo.specifiedQueryInfo.queryInterval &&
323✔
85
        (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) {
182✔
86
        toolsMsleep((int32_t)(
19✔
87
                    g_queryInfo.specifiedQueryInfo.queryInterval*1000
19✔
88
                    - (et - st)));  // ms
19✔
89
    }
90
}
323✔
91

92
// reset 
93
int32_t resetQueryCache(qThreadInfo* pThreadInfo) {    
7✔
94
    // execute sql 
95
    if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE", true)) {
7✔
NEW
96
        errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
97
        return -1;
×
98
    }
99
    // succ
100
    return 0;
7✔
101
}
102

103

104

105
//
106
//  ---------------------------------  second levle funtion for Thread -----------------------------------
107
//
108

109
// show rela qps
110
int64_t showRealQPS(qThreadInfo* pThreadInfo, int64_t lastPrintTime, int64_t startTs) {
6,304✔
111
    int64_t now = toolsGetTimestampMs();
6,304✔
112
    if (now - lastPrintTime > 10 * 1000) {
6,305✔
113
        // real total
NEW
114
        uint64_t totalQueried = pThreadInfo->nSucc;
×
NEW
115
        if(g_arguments->continueIfFail == YES_IF_FAILED) {
×
NEW
116
            totalQueried += pThreadInfo->nFail;
×
117
        }
NEW
118
        infoPrint(
×
119
            "thread[%d] has currently completed queries: %" PRIu64 ", QPS: %10.3f\n",
120
            pThreadInfo->threadID, totalQueried,
121
            (double)(totalQueried / ((now - startTs) / 1000.0)));
NEW
122
        return now;
×
123
    } else {
124
        return lastPrintTime;
6,306✔
125
    }    
126
}
127

128
// spec query mixed thread
129
static void *specQueryMixThread(void *sarg) {
4✔
130
    qThreadInfo *pThreadInfo = (qThreadInfo*)sarg;
4✔
131
#ifdef LINUX
132
    prctl(PR_SET_NAME, "specQueryMixThread");
4✔
133
#endif
134
    // use db
135
    if (g_queryInfo.dbName) {
4✔
136
        if (pThreadInfo->conn &&
4✔
137
            pThreadInfo->conn->taos &&
8✔
138
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
4✔
NEW
139
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
140
                return NULL;
×
141
        }
142
    }
143

144
    int64_t st = 0;
4✔
145
    int64_t et = 0;
4✔
146
    int64_t startTs = toolsGetTimestampMs();
4✔
147
    int64_t lastPrintTime = startTs;
4✔
148
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
4✔
149
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
4✔
150
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
8✔
151
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
4✔
152
        for (int j = 0; j < queryTimes; ++j) {
28✔
153
            // use cancel
154
            if(g_arguments->terminate) {
24✔
NEW
155
                infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
156
                break;
×
157
            }
158

159
            // reset cache
160
            if (g_queryInfo.reset_query_cache) {
24✔
NEW
161
                if (resetQueryCache(pThreadInfo)) {
×
NEW
162
                    errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
UNCOV
163
                    return NULL;
×
164
                }
165
            }
166

167
            // execute sql
168
            st = toolsGetTimestampUs();
24✔
169
            int ret = selectAndGetResult(pThreadInfo, sql->command, false);
24✔
170
            if (ret) {
24✔
NEW
171
                g_fail = true;
×
NEW
172
                errorPrint("failed call mix selectAndGetResult, i=%d j=%d", i, j);
×
NEW
173
                return NULL;
×
174
            }
175
            et = toolsGetTimestampUs();
24✔
176

177
            // sleep
178
            autoSleep(st, et);
24✔
179

180
            // delay
181
            if (ret == 0) {
24✔
182
                int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
24✔
183
                *delay = et - st;
24✔
184
                debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, *delay);
24✔
185

186
                pThreadInfo->total_delay += *delay;
24✔
187
                if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
24✔
NEW
188
                    tmfree(delay);
×
189
                }
190
            }
191

192
            // real show
193
            lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
24✔
194
        }
195
    }
196

197
    return NULL;
4✔
198
}
199

200
// spec query thread
201
static void *specQueryThread(void *sarg) {
111✔
202
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
111✔
203
#ifdef LINUX
204
    prctl(PR_SET_NAME, "specQueryThread");
111✔
205
#endif
206
    uint64_t st = 0;
111✔
207
    uint64_t et = 0;
111✔
208
    int32_t  index = 0;
111✔
209

210
    // use db
211
    if (g_queryInfo.dbName) {
111✔
212
        if (pThreadInfo->conn &&
111✔
213
            pThreadInfo->conn->taos &&
196✔
214
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
98✔
215
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
216
                return NULL;
×
217
        }
218
    }
219

220
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
111✔
221
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
111✔
222

223
    uint64_t  startTs       = toolsGetTimestampMs();
111✔
224
    uint64_t  lastPrintTime = startTs;
111✔
225

226
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, pThreadInfo->querySeq);
111✔
227

228
    if (sql->result[0] != '\0') {
111✔
229
        snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
102✔
230
                sql->result, pThreadInfo->threadID);
102✔
231
    }
232

233
    while (index < queryTimes) {
330✔
234
        // use cancel
235
        if(g_arguments->terminate) {
225✔
NEW
236
            infoPrint("thread[%d] user cancel , so exit testing.\n", pThreadInfo->threadID);
×
237
            break;
6✔
238
        }
239

240
        // reset cache
241
        if (g_queryInfo.reset_query_cache) {
225✔
242
            if (resetQueryCache(pThreadInfo)) {
6✔
NEW
243
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
UNCOV
244
                return NULL;
×
245
            }
246
        }
247

248
        // execute sql
249
        st = toolsGetTimestampUs();
225✔
250
        int ret = selectAndGetResult(pThreadInfo, sql->command, false);
225✔
251
        if (ret) {
225✔
252
            g_fail = true;
6✔
253
            errorPrint("failed call spec selectAndGetResult, index=%d\n", index);
6✔
254
            break;
6✔
255
        }
256
        et = toolsGetTimestampUs();
219✔
257

258
        // sleep
259
        autoSleep(st, et);
219✔
260

261
        uint64_t delay = et - st;
219✔
262
        debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
219✔
263

264
        if (ret == 0) {
219✔
265
            // only succ add delay list
266
            benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
219✔
267
            pThreadInfo->total_delay += delay;
219✔
268
        }
269
        index++;
219✔
270

271
        // real show
272
        lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
219✔
273
    }
274

275
    return NULL;
111✔
276
}
277

278
// super table query thread
279
static void *stbQueryThread(void *sarg) {
34✔
280
    char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
34✔
281
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
34✔
282
#ifdef LINUX
283
    prctl(PR_SET_NAME, "stbQueryThread");
34✔
284
#endif
285

286
    uint64_t st = 0;
34✔
287
    uint64_t et = 0;
34✔
288

289
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
34✔
290
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(uint64_t));
34✔
291
    
292
    uint64_t startTs = toolsGetTimestampMs();
34✔
293
    uint64_t lastPrintTime = startTs;
34✔
294
    while (queryTimes--) {
114✔
295
        // use cancel
296
        if(g_arguments->terminate) {
80✔
NEW
297
            infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
298
            break;
×
299
        }
300

301
        // reset cache
302
        if (g_queryInfo.reset_query_cache) {
80✔
303
            if (resetQueryCache(pThreadInfo)) {
1✔
NEW
304
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
305
                return NULL;
×
306
            }
307
        }
308

309
        // execute
310
        st = toolsGetTimestampMs();
80✔
311
        // for each table
312
        for (int i = (int)pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
204✔
313
            // use cancel
314
            if(g_arguments->terminate) {
124✔
NEW
315
                infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
316
                break;
×
317
            }
318

319
            // for each sql
320
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
6,188✔
321
                memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN);
6,064✔
322
                // use cancel
323
                if(g_arguments->terminate) {
6,064✔
NEW
324
                    infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
325
                    break;
×
326
                }
327

328
                
329
                // get real child name sql
330
                if (replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i)) {
6,064✔
331
                    // fault
NEW
332
                    tmfree(sqlstr);
×
NEW
333
                    return NULL;
×
334
                }
335

336
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
6,064✔
337
                    snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
6,064✔
338
                            g_queryInfo.superQueryInfo.result[j],
6,064✔
339
                            pThreadInfo->threadID);
340
                }
341

342
                // execute sql
343
                uint64_t s = toolsGetTimestampUs();
6,064✔
344
                int ret = selectAndGetResult(pThreadInfo, sqlstr, false);
6,063✔
345
                if (ret) {
6,064✔
346
                    // found error
NEW
347
                    errorPrint("failed call stb selectAndGetResult, i=%d j=%d\n", i, j);
×
UNCOV
348
                    g_fail = true;
×
NEW
349
                    tmfree(sqlstr);
×
NEW
350
                    return NULL;
×
351
                }
352
                uint64_t delay = toolsGetTimestampUs() - s;
6,064✔
353
                debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
6,064✔
354
                if (ret == 0) {
6,064✔
355
                    // only succ add delay list
356
                    benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
6,064✔
357
                    pThreadInfo->total_delay += delay;
6,061✔
358
                }
359

360
                // show real QPS
361
                lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
6,061✔
362
            }
363
        }
364
        et = toolsGetTimestampMs();
80✔
365

366
        // sleep
367
        autoSleep(st, et);
80✔
368
    }
369
    tmfree(sqlstr);
34✔
370

371
    return NULL;
34✔
372
}
373

374
//
375
// ---------------------------------  firse level function ------------------------------
376
//
377

378
void totalChildQuery(qThreadInfo* infos, int threadCnt, int64_t spend) {
11✔
379
    // valid check
380
    if (infos == NULL || threadCnt == 0) {
11✔
NEW
381
        return ;
×
382
    }
383
    
384
    // statistic
385
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
11✔
386
    double total_delays = 0;
11✔
387

388
    // clear
389
    for (int i = 0; i < threadCnt; ++i) {
49✔
390
        qThreadInfo * pThreadInfo = infos + i;
38✔
391
        if(pThreadInfo->query_delay_list == NULL) {
38✔
NEW
392
            continue;;
×
393
        }
394
        
395
        // append delay
396
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
38✔
397
                pThreadInfo->query_delay_list->size, false);
38✔
398
        total_delays += pThreadInfo->total_delay;
38✔
399

400
        // free delay
401
        benchArrayDestroy(pThreadInfo->query_delay_list);
38✔
402
        pThreadInfo->query_delay_list = NULL;
38✔
403

404
    }
405

406
    // succ is zero
407
    if (delay_list->size == 0) {
11✔
NEW
408
        errorPrint("%s", "succ queries count is zero.\n");
×
NEW
409
        benchArrayDestroy(delay_list);
×
NEW
410
        return ;
×
411
    }
412

413

414
    // sort
415
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
11✔
416

417
    // show delay min max
418
    if (delay_list->size) {
11✔
419
        infoPrint(
11✔
420
                "spend %.6fs using "
421
                "%d threads complete query %d times,  "
422
                "min delay: %.6fs, "
423
                "avg delay: %.6fs, "
424
                "p90: %.6fs, "
425
                "p95: %.6fs, "
426
                "p99: %.6fs, "
427
                "max: %.6fs\n",
428
                spend/1E6,
429
                threadCnt, (int)delay_list->size,
430
                *(int64_t *)(benchArrayGet(delay_list, 0))/1E6,
431
                (double)total_delays/delay_list->size/1E6,
432
                *(int64_t *)(benchArrayGet(delay_list,
433
                                    (int32_t)(delay_list->size * 0.9)))/1E6,
434
                *(int64_t *)(benchArrayGet(delay_list,
435
                                    (int32_t)(delay_list->size * 0.95)))/1E6,
436
                *(int64_t *)(benchArrayGet(delay_list,
437
                                    (int32_t)(delay_list->size * 0.99)))/1E6,
438
                *(int64_t *)(benchArrayGet(delay_list,
439
                                    (int32_t)(delay_list->size - 1)))/1E6);
440
    } else {
NEW
441
        errorPrint("%s() LN%d, delay_list size: %"PRId64"\n",
×
442
                   __func__, __LINE__, (int64_t)delay_list->size);
443
    }
444
    benchArrayDestroy(delay_list);
11✔
445
}
446

447
//
448
// super table query
449
//
450
static int stbQuery(uint16_t iface, char* dbName) {
9✔
451
    int ret = -1;
9✔
452
    pthread_t * pidsOfSub   = NULL;
9✔
453
    qThreadInfo *threadInfos = NULL;
9✔
454
    g_queryInfo.superQueryInfo.totalQueried = 0;
9✔
455
    g_queryInfo.superQueryInfo.totalFail    = 0;
9✔
456

457
    // check
458
    if ((g_queryInfo.superQueryInfo.sqlCount == 0)
9✔
459
        || (g_queryInfo.superQueryInfo.threadCnt == 0)) {
9✔
UNCOV
460
        return 0;
×
461
    }
462

463
    // malloc 
464
    pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
9✔
465
                            *sizeof(pthread_t),
466
                            false);
467
    threadInfos = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
9✔
468
                                *sizeof(qThreadInfo), false);
469

470
    int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
9✔
471
    int nConcurrent = g_queryInfo.superQueryInfo.threadCnt;
9✔
472

473
    int64_t a = ntables / nConcurrent;
9✔
474
    if (a < 1) {
9✔
475
        nConcurrent = (int)ntables;
2✔
476
        a = 1;
2✔
477
    }
478

479
    int64_t b = 0;
9✔
480
    if (nConcurrent != 0) {
9✔
481
        b = ntables % nConcurrent;
9✔
482
    }
483

484
    uint64_t tableFrom = 0;
9✔
485
    int threadCnt = 0;
9✔
486
    for (int i = 0; i < nConcurrent; i++) {
43✔
487
        qThreadInfo *pThreadInfo = threadInfos + i;
34✔
488
        pThreadInfo->threadID = i;
34✔
489
        pThreadInfo->start_table_from = tableFrom;
34✔
490
        pThreadInfo->ntables = i < b ? a + 1 : a;
34✔
491
        pThreadInfo->end_table_to =
34✔
492
                i < b ? tableFrom + a : tableFrom + a - 1;
34✔
493
        tableFrom = pThreadInfo->end_table_to + 1;
34✔
494
        // create conn
495
        if (initQueryConn(pThreadInfo, iface)){
34✔
NEW
496
            break;
×
497
        }
498
        int code = pthread_create(pidsOfSub + i, NULL, stbQueryThread, pThreadInfo);
34✔
499
        if (code != 0) {
34✔
NEW
500
            errorPrint("failed stbQueryThread create. error code =%d \n", code);
×
NEW
501
            break;
×
502
        }
503
        threadCnt ++;
34✔
504
    }
505

506
    bool needExit = false;
9✔
507
    // if failed, set termainte flag true like ctrl+c exit
508
    if (threadCnt != nConcurrent  ) {
9✔
NEW
509
        needExit = true;
×
NEW
510
        g_arguments->terminate = true;
×
NEW
511
        goto OVER;
×
512
    }
513

514
    // reset total
515
    g_queryInfo.superQueryInfo.totalQueried = 0;
9✔
516
    g_queryInfo.superQueryInfo.totalFail    = 0;
9✔
517

518
    // real thread count
519
    g_queryInfo.superQueryInfo.threadCnt = threadCnt;
9✔
520
    int64_t start = toolsGetTimestampUs();
9✔
521

522
    for (int i = 0; i < threadCnt; i++) {
43✔
523
        pthread_join(pidsOfSub[i], NULL);
34✔
524
        qThreadInfo *pThreadInfo = threadInfos + i;
34✔
525
        // add succ
526
        g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nSucc;
34✔
527
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
34✔
528
            // "yes" need add fail cnt
NEW
529
            g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nFail;
×
NEW
530
            g_queryInfo.superQueryInfo.totalFail    += pThreadInfo->nFail;
×
531
        }
532

533
        // close conn
534
        closeQueryConn(pThreadInfo, iface);
34✔
535
    }
536
    int64_t end = toolsGetTimestampUs();
9✔
537

538
    if (needExit) {
9✔
NEW
539
        goto OVER;
×
540
    }
541

542
    // total show
543
    totalChildQuery(threadInfos, threadCnt, end - start);
9✔
544

545
    ret = 0;
9✔
546

547
OVER:
9✔
548
    tmfree((char *)pidsOfSub);
9✔
549
    tmfree((char *)threadInfos);
9✔
550

551
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
69✔
552
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
60✔
553
    }
554
    tmfree(g_queryInfo.superQueryInfo.childTblName);
9✔
555
    return ret;
9✔
556
}
557

558
//
559
// specQuery
560
//
561
static int specQuery(uint16_t iface, char* dbName) {
12✔
562
    int ret = -1;
12✔
563
    pthread_t    *pids = NULL;
12✔
564
    qThreadInfo *infos = NULL;
12✔
565
    int    nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
12✔
566
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
12✔
567
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
12✔
568
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
12✔
569

570
    // check invaid
571
    if(nSqlCount == 0 || nConcurrent == 0 ) {
12✔
UNCOV
572
        if(nSqlCount == 0)
×
UNCOV
573
           warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
×
UNCOV
574
        if(nConcurrent == 0)
×
NEW
575
           warnPrint("nConcurrent is %d , specified_table_query->nConcurrent is zero. \n", nConcurrent);
×
UNCOV
576
        return 0;
×
577
    }
578

579
    // malloc threads memory
580
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
12✔
581
    infos = benchCalloc(1, nConcurrent * sizeof(qThreadInfo), false);
12✔
582

583
    for (uint64_t i = 0; i < nSqlCount; i++) {
51✔
584
        if( g_arguments->terminate ) {
41✔
NEW
585
            break;
×
586
        }
587

588
        // reset
589
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
41✔
590
        memset(infos, 0, nConcurrent * sizeof(qThreadInfo));
41✔
591

592
        // get execute sql
593
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
41✔
594

595
        // create threads
596
        int threadCnt = 0;
41✔
597
        for (int j = 0; j < nConcurrent; j++) {
152✔
598
           qThreadInfo *pThreadInfo = infos + j;
111✔
599
           pThreadInfo->threadID = i * nConcurrent + j;
111✔
600
           pThreadInfo->querySeq = i;
111✔
601

602
           // create conn
603
           if (initQueryConn(pThreadInfo, iface)) {
111✔
NEW
604
               break;
×
605
           }
606

607
           int code = pthread_create(pids + j, NULL, specQueryThread, pThreadInfo);
111✔
608
           if (code != 0) {
111✔
NEW
609
               errorPrint("failed specQueryThread create. error code =%d \n", code);
×
NEW
610
               break;
×
611
           }
612
           threadCnt++;
111✔
613
        }
614

615
        bool needExit = false;
41✔
616
        // if failed, set termainte flag true like ctrl+c exit
617
        if (threadCnt != nConcurrent  ) {
41✔
NEW
618
            needExit = true;
×
UNCOV
619
            g_arguments->terminate = true;
×
620
        }
621

622
        int64_t start = toolsGetTimestampUs();
41✔
623
        // wait threads execute finished one by one
624
        for (int j = 0; j < threadCnt ; j++) {
152✔
625
           pthread_join(pids[j], NULL);
111✔
626
           qThreadInfo *pThreadInfo = infos + j;
111✔
627
           closeQueryConn(pThreadInfo, iface);
111✔
628

629
           // need exit in loop
630
           if (needExit) {
111✔
631
                // free BArray
NEW
632
                benchArrayDestroy(pThreadInfo->query_delay_list);
×
UNCOV
633
                pThreadInfo->query_delay_list = NULL;
×
634
           }
635
        }
636
        int64_t spend = toolsGetTimestampUs() - start;
41✔
637
        if(spend == 0) {
41✔
638
            // avoid xx/spend expr throw error
NEW
639
            spend = 1;
×
640
        }
641

642
        // create 
643
        if (needExit) {
41✔
NEW
644
            errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
NEW
645
            goto OVER;
×
646
        }
647

648
        //
649
        // show QPS and P90 ...
650
        //
651
        uint64_t n = 0;
41✔
652
        double  total_delays = 0.0;
41✔
653
        uint64_t totalQueried = 0;
41✔
654
        uint64_t totalFail    = 0;
41✔
655
        for (int j = 0; j < threadCnt; j++) {
152✔
656
           qThreadInfo *pThreadInfo = infos + j;
111✔
657
           if(pThreadInfo->query_delay_list == NULL) {
111✔
NEW
658
                continue;;
×
659
           }
660
           
661
           // total one sql
662
           for (uint64_t k = 0; k < pThreadInfo->query_delay_list->size; k++) {
330✔
663
                int64_t * delay = benchArrayGet(pThreadInfo->query_delay_list, k);
219✔
664
                sql->delay_list[n++] = *delay;
219✔
665
                total_delays += *delay;
219✔
666
           }
667

668
           // total queries
669
           totalQueried += pThreadInfo->nSucc;
111✔
670
           if (g_arguments->continueIfFail == YES_IF_FAILED) {
111✔
671
                totalQueried += pThreadInfo->nFail;
7✔
672
                totalFail    += pThreadInfo->nFail;
7✔
673
           }
674

675
           // free BArray query_delay_list
676
           benchArrayDestroy(pThreadInfo->query_delay_list);
111✔
677
           pThreadInfo->query_delay_list = NULL;
111✔
678
        }
679

680
        // appand current sql
681
        g_queryInfo.specifiedQueryInfo.totalQueried += totalQueried;
41✔
682
        g_queryInfo.specifiedQueryInfo.totalFail    += totalFail;
41✔
683

684
        // succ is zero
685
        if(totalQueried == 0 || n == 0) {
41✔
686
            errorPrint("%s", "succ queries count is zero.\n");
2✔
687
            goto OVER;
2✔
688
        }
689

690
        qsort(sql->delay_list, n, sizeof(uint64_t), compare);
39✔
691
        int32_t bufLen = strlen(sql->command) + 512;
39✔
692
        char * buf = benchCalloc(bufLen, sizeof(char), false);
39✔
693
        snprintf(buf , bufLen, "complete query with %d threads and %" PRIu64 " "
39✔
694
                             "sql %"PRIu64" spend %.6fs QPS: %.3f "
695
                             "query delay "
696
                             "avg: %.6fs "
697
                             "min: %.6fs "
698
                             "max: %.6fs "
699
                             "p90: %.6fs "
700
                             "p95: %.6fs "
701
                             "p99: %.6fs "
702
                             "SQL command: %s \n",
703
                             threadCnt, totalQueried,
704
                             i + 1, spend/1E6, totalQueried / (spend/1E6),
39✔
705
                             total_delays/n/1E6,           /* avg */
39✔
706
                             sql->delay_list[0] / 1E6,     /* min */
39✔
707
                             sql->delay_list[n - 1] / 1E6, /* max */
39✔
708
                             /*  p90 */
709
                             sql->delay_list[(uint64_t)(n * 0.90)] / 1E6,
39✔
710
                             /*  p95 */
711
                             sql->delay_list[(uint64_t)(n * 0.95)] / 1E6,
39✔
712
                             /*  p99 */
713
                             sql->delay_list[(uint64_t)(n * 0.99)] / 1E6, 
39✔
714
                             sql->command);
715

716
        infoPrintNoTimestamp("%s", buf);
39✔
717
        infoPrintNoTimestampToFile("%s", buf);
39✔
718
        tmfree(buf);
39✔
719
    }
720
    ret = 0;
10✔
721

722
OVER:
12✔
723
    tmfree((char *)pids);
12✔
724
    tmfree((char *)infos);
12✔
725

726
    // free specialQueryInfo
727
    freeSpecialQueryInfo();
12✔
728
    return ret;
12✔
729
}
730

731
//
732
// specQueryMix
733
//
734
static int specQueryMix(uint16_t iface, char* dbName) {
2✔
735
    // init
736
    int ret            = -1;
2✔
737
    int nConcurrent    = g_queryInfo.specifiedQueryInfo.concurrent;
2✔
738
    pthread_t * pids   = benchCalloc(nConcurrent, sizeof(pthread_t), true);
2✔
739
    qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true);
2✔
740

741
    // concurent calc
742
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
2✔
743
    int start_sql     = 0;
2✔
744
    int a             = total_sql_num / nConcurrent;
2✔
745
    if (a < 1) {
2✔
746
        nConcurrent = total_sql_num;
1✔
747
        a = 1;
1✔
748
    }
749
    int b = 0;
2✔
750
    if (nConcurrent != 0) {
2✔
751
        b = total_sql_num % nConcurrent;
2✔
752
    }
753

754
    //
755
    // running
756
    //
757
    int threadCnt = 0;
2✔
758
    for (int i = 0; i < nConcurrent; ++i) {
6✔
759
        qThreadInfo *pThreadInfo = infos + i;
4✔
760
        pThreadInfo->threadID    = i;
4✔
761
        pThreadInfo->start_sql   = start_sql;
4✔
762
        pThreadInfo->end_sql     = i < b ? start_sql + a : start_sql + a - 1;
4✔
763
        start_sql = pThreadInfo->end_sql + 1;
4✔
764
        pThreadInfo->total_delay = 0;
4✔
765

766
        // create conn
767
        if (initQueryConn(pThreadInfo, iface)){
4✔
NEW
768
            break;
×
769
        }
770
        // main run
771
        int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo);
4✔
772
        if (code != 0) {
4✔
NEW
773
            errorPrint("failed specQueryMixThread create. error code =%d \n", code);
×
NEW
774
            break;
×
775
        }
776
        
777
        threadCnt ++;
4✔
778
    }
779

780
    bool needExit = false;
2✔
781
    // if failed, set termainte flag true like ctrl+c exit
782
    if (threadCnt != nConcurrent) {
2✔
NEW
783
        needExit = true;
×
NEW
784
        g_arguments->terminate = true;
×
785
    }
786

787
    // reset total
788
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
2✔
789
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
2✔
790

791
    int64_t start = toolsGetTimestampUs();
2✔
792
    for (int i = 0; i < threadCnt; ++i) {
6✔
793
        pthread_join(pids[i], NULL);
4✔
794
        qThreadInfo *pThreadInfo = infos + i;
4✔
795
        closeQueryConn(pThreadInfo, iface);
4✔
796

797
        // total queries
798
        g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc;
4✔
799
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
4✔
800
            // yes need add failed count
801
            g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail;
2✔
802
            g_queryInfo.specifiedQueryInfo.totalFail    += pThreadInfo->nFail;
2✔
803
        }
804

805
        // destory
806
        if (needExit) {
4✔
NEW
807
            benchArrayDestroy(pThreadInfo->query_delay_list);
×
NEW
808
            pThreadInfo->query_delay_list = NULL;
×
809
        }
810
    }
811
    int64_t end = toolsGetTimestampUs();
2✔
812

813
    // create 
814
    if (needExit) {
2✔
NEW
815
        errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
NEW
816
        goto OVER;
×
817
    }
818

819
    // statistic
820
    totalChildQuery(infos, threadCnt, end - start);
2✔
821
    ret = 0;
2✔
822

823
OVER:
2✔
824
    tmfree(pids);
2✔
825
    tmfree(infos);
2✔
826

827
    // free sqls
828
    freeSpecialQueryInfo();
2✔
829

830
    return ret;
2✔
831
}
832

833
// total query for end 
834
void totalQuery(int64_t spends) {
21✔
835
    // total QPS
836
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
21✔
837
        + g_queryInfo.superQueryInfo.totalQueried;
21✔
838

839
    // error rate
840
    char errRate[128] = "";
21✔
841
    if(g_arguments->continueIfFail == YES_IF_FAILED) {
21✔
842
        uint64_t totalFail = g_queryInfo.specifiedQueryInfo.totalFail + g_queryInfo.superQueryInfo.totalFail;
3✔
843
        if (totalQueried > 0) {
3✔
844
            snprintf(errRate, sizeof(errRate), " Error %" PRIu64 " Rate:%.3f%%", totalFail, ((float)totalFail * 100)/totalQueried);
3✔
845
        }
846
    }
847

848
    // show
849
    double  tInS = (double)spends / 1000;
21✔
850
    char buf[512] = "";
21✔
851
    snprintf(buf, sizeof(buf),
21✔
852
                "Spend %.4f second completed total queries: %" PRIu64
853
                ", the QPS of all threads: %10.3f%s\n\n",
854
                tInS, totalQueried, (double)totalQueried / tInS, errRate);
21✔
855
    infoPrint("%s", buf);
21✔
856
    infoPrintToFile("%s", buf);
21✔
857
}
21✔
858

859
int queryTestProcess() {
23✔
860
    prompt(0);
23✔
861

862
    if (REST_IFACE == g_queryInfo.iface) {
23✔
863
        encodeAuthBase64();
6✔
864
    }
865

866
    // kill sql for executing seconds over "kill_slow_query_threshold"
867
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
23✔
NEW
868
        int32_t ret = killSlowQuery();
×
NEW
869
        if (ret != 0) {
×
NEW
870
            return ret;
×
871
        }
872
    }
873

874
    // covert addr
875
    if (g_queryInfo.iface == REST_IFACE) {
23✔
876
        if (convertHostToServAddr(g_arguments->host,
6✔
877
                    g_arguments->port + TSDB_PORT_HTTP,
6✔
878
                    &(g_arguments->serv_addr)) != 0) {
6✔
879
            errorPrint("%s", "convert host to server address\n");
×
880
            return -1;
×
881
        }
882
    }
883

884
    // fetch child name if super table
885
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
23✔
886
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
9✔
887
        int32_t ret = fetchChildTableName(g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
9✔
888
        if (ret != 0) {
9✔
NEW
889
            errorPrint("fetchChildTableName dbName=%s stb=%s failed.", g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
×
UNCOV
890
            return -1;
×
891
        }
892
    }
893

894
    // 
895
    // start running
896
    //
897

898
    
899
    uint64_t startTs = toolsGetTimestampMs();
23✔
900
    if(g_queryInfo.specifiedQueryInfo.sqls && g_queryInfo.specifiedQueryInfo.sqls->size > 0) {
23✔
901
        // specified table
902
        if (g_queryInfo.specifiedQueryInfo.mixed_query) {
14✔
903
            // mixed
904
            if (specQueryMix(g_queryInfo.iface, g_queryInfo.dbName)) {
2✔
NEW
905
                return -1;
×
906
            }
907
        } else {
908
            // no mixied
909
            if (specQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
12✔
910
                return -1;
2✔
911
            }
912
        }
913
    } else if(g_queryInfo.superQueryInfo.sqlCount > 0) {
9✔
914
        // super table
915
        if (stbQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
9✔
UNCOV
916
            return -1;
×
917
        }
918
    } else {
919
        // nothing
NEW
920
        errorPrint("%s\n", "Both 'specified_table_query' and 'super_table_query' sqls is empty.");
×
UNCOV
921
        return -1;
×
922
    }
923

924
    // total 
925
    totalQuery(toolsGetTimestampMs() - startTs); 
21✔
926
    return 0;
21✔
927
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc