• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5025

17 Apr 2026 01:15AM UTC coverage: 73.009% (+0.06%) from 72.954%
#5025

push

travis-ci

web-flow
fix: replace \s with [[:space:]] in shell regex for macOS BSD compat (#35162)

6 of 7 new or added lines in 2 files covered. (85.71%)

2432 existing lines in 145 files now uncovered.

273326 of 374375 relevant lines covered (73.01%)

130781443.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.2
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <string.h>
17
#include "cJSON.h"
18
#include "clientInt.h"
19
#include "clientRawBlockWrite.h"
20
#include "nodes.h"
21
#include "osMemPool.h"
22
#include "osMemory.h"
23
#include "parser.h"
24
#include "taosdef.h"
25
#include "tarray.h"
26
#include "tbase64.h"
27
#include "tcol.h"
28
#include "tcompression.h"
29
#include "tdatablock.h"
30
#include "tdataformat.h"
31
#include "tdef.h"
32
#include "tglobal.h"
33
#include "tmsgtype.h"
34

35
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
36
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
40,816✔
37
  if (db == NULL) {
40,816✔
UNCOV
38
    return suid;
×
39
  }
40
  return suid + MurmurHash3_32(db, strlen(db));
40,816✔
41
}
42

43
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
40,816✔
44
  if (taos == NULL || meta == NULL) {
40,816✔
UNCOV
45
    uError("invalid parameter in %s", __func__);
×
UNCOV
46
    return TSDB_CODE_INVALID_PARA;
×
47
  }
48
  SVCreateStbReq req = {0};
40,816✔
49
  SDecoder       coder = {0};
40,816✔
50
  SMCreateStbReq pReq = {0};
40,816✔
51
  int32_t        code = TSDB_CODE_SUCCESS;
40,816✔
52
  int32_t        lino = 0;
40,816✔
53
  SRequestObj*   pRequest = NULL;
40,816✔
54
  SCmdMsgInfo    pCmdMsg = {0};
40,816✔
55
  RAW_LOG_START
40,816✔
56

57
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
40,816✔
58
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d, db:%s", LOG_ID_VALUE, meta, metaLen,
40,816✔
59
         pRequest->pDb ? pRequest->pDb : "NULL");
60
  pRequest->syncQuery = true;
40,816✔
61
  if (!pRequest->pDb) {
40,816✔
UNCOV
62
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
63
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
64
    goto end;
×
65
  }
66
  // decode and process req
67
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
40,816✔
68
  uint32_t len = metaLen - sizeof(SMsgHead);
40,816✔
69
  tDecoderInit(&coder, data, len);
40,816✔
70
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
40,816✔
71

72
  int8_t           createDefaultCompress = 0;
40,816✔
73
  SColCmprWrapper* p = &req.colCmpr;
40,816✔
74
  if (p->nCols == 0) {
40,816✔
UNCOV
75
    createDefaultCompress = 1;
×
76
  }
77
  // build create stable
78
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
40,816✔
79
  RAW_NULL_CHECK(pReq.pColumns);
40,816✔
80
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
256,940✔
81
    SSchema*          pSchema = req.schemaRow.pSchema + i;
216,124✔
82
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
216,124✔
83
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
216,124✔
84

85
    if (createDefaultCompress) {
216,124✔
UNCOV
86
      field.compress = createDefaultColCmprByType(pSchema->type);
×
87
    } else {
88
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
216,124✔
89
      field.compress = pCmp->alg;
216,124✔
90
    }
91
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
216,124✔
92
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
432,248✔
93
  }
94
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
40,816✔
95
  RAW_NULL_CHECK(pReq.pTags);
40,816✔
96
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
148,218✔
97
    SSchema* pSchema = req.schemaTag.pSchema + i;
107,402✔
98
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
107,402✔
99
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
107,402✔
100
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
214,804✔
101
  }
102

103
  pReq.colVer = req.schemaRow.version;
40,816✔
104
  pReq.tagVer = req.schemaTag.version;
40,816✔
105
  pReq.numOfColumns = req.schemaRow.nCols;
40,816✔
106
  pReq.numOfTags = req.schemaTag.nCols;
40,816✔
107
  pReq.commentLen = -1;
40,816✔
108
  pReq.suid = processSuid(req.suid, pRequest->pDb);
40,816✔
109
  pReq.source = TD_REQ_FROM_TAOX;
40,816✔
110
  pReq.igExists = true;
40,816✔
111
  pReq.virtualStb = req.virtualStb;
40,816✔
112

113
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
40,816✔
114
         pReq.suid);
115
  STscObj* pTscObj = pRequest->pTscObj;
40,816✔
116
  SName    tableName = {0};
40,816✔
117
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
40,816✔
118
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
40,816✔
119
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
40,816✔
120
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
40,816✔
121
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
40,816✔
122
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
40,816✔
123
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
40,816✔
124
  RAW_NULL_CHECK(pCmdMsg.pMsg);
40,816✔
125
  RAW_FALSE_CHECK(tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
40,816✔
126

127
  SQuery pQuery = {0};
40,816✔
128
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
40,816✔
129
  pQuery.pCmdMsg = &pCmdMsg;
40,816✔
130
  pQuery.msgType = pQuery.pCmdMsg->msgType;
40,816✔
131
  pQuery.stableQuery = true;
40,816✔
132

133
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
40,816✔
134

135
  if (pRequest->code == TSDB_CODE_SUCCESS) {
40,816✔
136
    SCatalog* pCatalog = NULL;
40,816✔
137
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
40,816✔
138
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
40,816✔
139
  }
140

141
  code = pRequest->code;
40,816✔
142
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
40,816✔
143

144
end:
40,816✔
145
  destroyRequest(pRequest);
40,816✔
146
  tFreeSMCreateStbReq(&pReq);
40,816✔
147
  tDecoderClear(&coder);
40,816✔
148
  taosMemoryFree(pCmdMsg.pMsg);
40,816✔
149
  RAW_LOG_END
40,816✔
150
  return code;
40,816✔
151
}
152

153
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
2,482✔
154
  if (taos == NULL || meta == NULL) {
2,482✔
UNCOV
155
    uError("invalid parameter in %s", __func__);
×
UNCOV
156
    return TSDB_CODE_INVALID_PARA;
×
157
  }
158
  SVDropStbReq req = {0};
2,482✔
159
  SDecoder     coder = {0};
2,482✔
160
  SMDropStbReq pReq = {0};
2,482✔
161
  int32_t      code = TSDB_CODE_SUCCESS;
2,482✔
162
  int32_t      lino = 0;
2,482✔
163
  SRequestObj* pRequest = NULL;
2,482✔
164
  SCmdMsgInfo  pCmdMsg = {0};
2,482✔
165

166
  RAW_LOG_START
2,482✔
167
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
2,482✔
168
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
2,482✔
169
  pRequest->syncQuery = true;
2,482✔
170
  if (!pRequest->pDb) {
2,482✔
UNCOV
171
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
172
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
173
    goto end;
×
174
  }
175
  // decode and process req
176
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
2,482✔
177
  uint32_t len = metaLen - sizeof(SMsgHead);
2,482✔
178
  tDecoderInit(&coder, data, len);
2,482✔
179
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&coder, &req));
2,482✔
180
  SCatalog* pCatalog = NULL;
2,482✔
181
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,482✔
182
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
2,482✔
183
                           .requestId = pRequest->requestId,
2,482✔
184
                           .requestObjRefId = pRequest->self,
2,482✔
185
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
2,482✔
186
  SName            pName = {0};
2,482✔
187
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
2,482✔
188
  STableMeta* pTableMeta = NULL;
2,482✔
189
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
2,482✔
190
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
2,482✔
191
    uInfo(LOG_ID_TAG " stable %s not exist, ignore drop", LOG_ID_VALUE, req.name);
614✔
192
    code = TSDB_CODE_SUCCESS;
614✔
193
    taosMemoryFreeClear(pTableMeta);
614✔
194
    goto end;
614✔
195
  }
196
  RAW_RETURN_CHECK(code);
1,868✔
197
  pReq.suid = pTableMeta->uid;
1,868✔
198
  taosMemoryFreeClear(pTableMeta);
1,868✔
199

200
  // build drop stable
201
  pReq.igNotExists = true;
1,868✔
202
  pReq.source = TD_REQ_FROM_TAOX;
1,868✔
203
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
204

205
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
1,868✔
206
         pReq.suid);
207
  STscObj* pTscObj = pRequest->pTscObj;
1,868✔
208
  SName    tableName = {0};
1,868✔
209
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
1,868✔
210
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
1,868✔
211

212
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,868✔
213
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
1,868✔
214
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
1,868✔
215
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
1,868✔
216
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
1,868✔
217
  RAW_NULL_CHECK(pCmdMsg.pMsg);
1,868✔
218
  RAW_FALSE_CHECK(tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
1,868✔
219

220
  SQuery pQuery = {0};
1,868✔
221
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
1,868✔
222
  pQuery.pCmdMsg = &pCmdMsg;
1,868✔
223
  pQuery.msgType = pQuery.pCmdMsg->msgType;
1,868✔
224
  pQuery.stableQuery = true;
1,868✔
225

226
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
1,868✔
227
  if (pRequest->code == TSDB_CODE_SUCCESS) {
1,868✔
228
    // ignore the error code
229
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
1,868✔
230
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
1,868✔
231
  }
232

233
  code = pRequest->code;
1,868✔
234
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,868✔
235

236
end:
2,482✔
237
  RAW_LOG_END
2,482✔
238
  destroyRequest(pRequest);
2,482✔
239
  tDecoderClear(&coder);
2,482✔
240
  return code;
2,482✔
241
}
242

243
typedef struct SVgroupCreateTableBatch {
244
  SVCreateTbBatchReq req;
245
  SVgroupInfo        info;
246
  char               dbName[TSDB_DB_NAME_LEN];
247
} SVgroupCreateTableBatch;
248

249
static void destroyCreateTbReqBatch(void* data) {
55,928✔
250
  if (data == NULL) {
55,928✔
251
    uError("invalid parameter in %s", __func__);
×
252
    return;
×
253
  }
254
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
55,928✔
255
  taosArrayDestroy(pTbBatch->req.pArray);
55,928✔
256
}
257

258
static const SSchema* getNormalColSchema(const STableMeta* pTableMeta, const char* pColName) {
6,840✔
259
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
19,760✔
260
    const SSchema* pSchema = pTableMeta->schema + i;
19,760✔
261
    if (0 == strcmp(pColName, pSchema->name)) {
19,760✔
262
      return pSchema;
6,840✔
263
    }
264
  }
UNCOV
265
  return NULL;
×
266
}
267

268
static STableMeta* getTableMeta(SCatalog* pCatalog, SRequestConnInfo* conn, char* dbName, char* tbName, int32_t acctId){
6,840✔
269
  SName       sName = {0};
6,840✔
270
  toName(acctId, dbName, tbName, &sName);
6,840✔
271
  STableMeta* pTableMeta = NULL;
6,840✔
272
  int32_t code = catalogGetTableMeta(pCatalog, conn, &sName, &pTableMeta);
6,840✔
273
  if (code != 0) {
6,840✔
UNCOV
274
    uError("failed to get table meta for reference table:%s.%s", dbName, tbName);
×
UNCOV
275
    taosMemoryFreeClear(pTableMeta);
×
UNCOV
276
    terrno = code;
×
UNCOV
277
    return NULL;
×
278
  }
279
  return pTableMeta;
6,840✔
280
}
281

282
static int32_t checkColRef(STableMeta* pTableMeta, char* colName, uint8_t precision, const char* pSchemaName,
6,460✔
283
                           const SDataType* pType) {
284
  int32_t code = TSDB_CODE_SUCCESS;
6,460✔
285
  if (pTableMeta->tableInfo.precision != precision) {
6,460✔
UNCOV
286
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
UNCOV
287
    uError("timestamp precision of virtual table and its reference table do not match");
×
UNCOV
288
    goto end;
×
289
  }
290
  // org table cannot has composite primary key
291
  if (pTableMeta->tableInfo.numOfColumns > 1 && pTableMeta->schema[1].flags & COL_IS_KEY) {
6,460✔
UNCOV
292
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
UNCOV
293
    uError("virtual table's column:\"%s\"'s reference can not from table with composite key", colName);
×
UNCOV
294
    goto end;
×
295
  }
296

297
  // org table must be child table or normal table
298
  if (pTableMeta->tableType != TSDB_NORMAL_TABLE && pTableMeta->tableType != TSDB_CHILD_TABLE) {
6,460✔
UNCOV
299
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
300
    uError("virtual table's column:\"%s\"'s reference can only be normal table or child table", colName);
×
301
    goto end;
×
302
  }
303

304
  const SSchema* pRefCol = getNormalColSchema(pTableMeta, colName);
6,460✔
305
  if (NULL == pRefCol) {
6,460✔
306
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
307
    uError("virtual table's column:\"%s\"'s reference column:\"%s\" not exist", pSchemaName, colName);
×
UNCOV
308
    goto end;
×
309
  }
310

311
  int32_t refColIndex = getNormalColSchemaIndex(pTableMeta, colName);
6,460✔
312
  const SSchemaExt* pRefSchemaExt =
6,460✔
313
      (refColIndex >= 0 && pTableMeta->schemaExt && refColIndex < pTableMeta->tableInfo.numOfColumns)
6,460✔
314
          ? pTableMeta->schemaExt + refColIndex
6,460✔
315
          : NULL;
12,920✔
316
  SDataType refType = {0};
6,460✔
317
  schemaToRefDataType(pRefCol, NULL != pRefSchemaExt ? pRefSchemaExt->typeMod : 0, &refType);
6,460✔
318

319
  if (!isSameRefDataType(pType, &refType)) {
6,460✔
UNCOV
320
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
UNCOV
321
    uError("virtual table's column:\"%s\"'s type and reference column:\"%s\"'s type not match, %d %d %d %d",
×
322
            pSchemaName, colName, pType->type, pType->bytes, refType.type, refType.bytes);
UNCOV
323
    goto end;
×
324
  }
325

326
end:
6,460✔
327
  return code;
6,460✔
328
}
329

330
static int32_t checkColRefForCreate(SCatalog* pCatalog, SRequestConnInfo* conn, SColRef* pColRef, int32_t acctId,
6,080✔
331
                                    uint8_t precision, SSchema* pSchema, const SSchemaExt* pSchemaExt) {
332
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, pColRef->refDbName, pColRef->refTableName, acctId);
6,080✔
333
  if (pTableMeta == NULL) {
6,080✔
UNCOV
334
      return terrno;
×
335
  }
336
  SDataType colType = {0};
6,080✔
337
  schemaToRefDataType(pSchema, NULL != pSchemaExt ? pSchemaExt->typeMod : 0, &colType);
6,080✔
338
  int32_t code = checkColRef(pTableMeta, pColRef->refColName, precision, pSchema->name, &colType);
6,080✔
339
  taosMemoryFreeClear(pTableMeta);
6,080✔
340
  return code;
6,080✔
341
}
342

UNCOV
343
static int32_t checkColRefForAdd(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
×
344
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc, int8_t type, int32_t bytes, STypeMod typeMod) {
UNCOV
345
  int32_t code = 0;
×
346
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
×
347
  if (pTableMeta == NULL) {
×
348
    code = terrno;
×
349
    goto end;
×
350
  }
351
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
×
352
  if (pTableMetaSrc == NULL) {
×
UNCOV
353
    code = terrno;
×
354
    goto end;
×
355
  }
356

357
  SDataType colType = {.type = type, .bytes = bytes};
×
358
  fillTypeFromTypeMod(&colType, typeMod);
×
359
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, colNameSrc, &colType);
×
360

361
end:
×
362
  taosMemoryFreeClear(pTableMeta);
×
UNCOV
363
  taosMemoryFreeClear(pTableMetaSrc);
×
UNCOV
364
  return code;
×
365
}
366

367
static int32_t checkColRefForAlter(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
380✔
368
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc) {
369
  int32_t code = 0;
380✔
370
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
380✔
371
  if (pTableMeta == NULL) {
380✔
UNCOV
372
    code = terrno;
×
UNCOV
373
    goto end;
×
374
  }
375
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
380✔
376
  if (pTableMetaSrc == NULL) {
380✔
UNCOV
377
    code = terrno;
×
UNCOV
378
    goto end;
×
379
  }
380
  const SSchema* pSchema = getNormalColSchema(pTableMetaSrc, colNameSrc);
380✔
381
  if (NULL == pSchema) {
380✔
UNCOV
382
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
UNCOV
383
    uError("virtual table's column:\"%s\" not exist", colNameSrc);
×
UNCOV
384
    goto end;
×
385
  }
386

387
  int32_t schemaIdx = getNormalColSchemaIndex(pTableMetaSrc, colNameSrc);
380✔
388
  const SSchemaExt* pSchemaExt =
380✔
389
      (schemaIdx >= 0 && pTableMetaSrc->schemaExt && schemaIdx < pTableMetaSrc->tableInfo.numOfColumns)
380✔
390
          ? pTableMetaSrc->schemaExt + schemaIdx
380✔
391
          : NULL;
760✔
392
  SDataType colType = {0};
380✔
393
  schemaToRefDataType(pSchema, NULL != pSchemaExt ? pSchemaExt->typeMod : 0, &colType);
380✔
394
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, pSchema->name, &colType);
380✔
395

396
end:
380✔
397
  taosMemoryFreeClear(pTableMeta);
380✔
398
  taosMemoryFreeClear(pTableMetaSrc);
380✔
399
  return code;
380✔
400
}
401

402
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
55,310✔
403
  if (taos == NULL || meta == NULL) {
55,310✔
UNCOV
404
    uError("invalid parameter in %s", __func__);
×
UNCOV
405
    return TSDB_CODE_INVALID_PARA;
×
406
  }
407
  SVCreateTbBatchReq req = {0};
55,310✔
408
  SDecoder           coder = {0};
55,310✔
409
  int32_t            code = TSDB_CODE_SUCCESS;
55,310✔
410
  int32_t            lino = 0;
55,310✔
411
  SRequestObj*       pRequest = NULL;
55,310✔
412
  SQuery*            pQuery = NULL;
55,310✔
413
  SHashObj*          pVgroupHashmap = NULL;
55,310✔
414

415
  RAW_LOG_START
55,310✔
416
  SArray* pTagList = taosArrayInit(0, POINTER_BYTES);
55,310✔
417
  RAW_NULL_CHECK(pTagList);
55,310✔
418
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
55,310✔
419
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d, db:%s", LOG_ID_VALUE, meta, metaLen,
55,310✔
420
         pRequest->pDb ? pRequest->pDb : "NULL");
421

422
  pRequest->syncQuery = true;
55,310✔
423
  if (!pRequest->pDb) {
55,310✔
UNCOV
424
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
425
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
426
    goto end;
×
427
  }
428
  // decode and process req
429
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
55,310✔
430
  uint32_t len = metaLen - sizeof(SMsgHead);
55,310✔
431
  tDecoderInit(&coder, data, len);
55,310✔
432
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&coder, &req));
55,310✔
433
  STscObj* pTscObj = pRequest->pTscObj;
55,310✔
434

435
  SVCreateTbReq* pCreateReq = NULL;
55,310✔
436
  SCatalog*      pCatalog = NULL;
55,310✔
437
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
55,310✔
438
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
55,310✔
439
  RAW_NULL_CHECK(pVgroupHashmap);
55,310✔
440
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
55,310✔
441

442
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
55,310✔
443
                           .requestId = pRequest->requestId,
55,310✔
444
                           .requestObjRefId = pRequest->self,
55,310✔
445
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
55,310✔
446

447
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
55,310✔
448
  RAW_NULL_CHECK(pRequest->tableList);
55,310✔
449
  // loop to create table
450
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
114,106✔
451
    pCreateReq = req.pReqs + iReq;
58,796✔
452

453
    SVgroupInfo pInfo = {0};
58,796✔
454
    SName       pName = {0};
58,796✔
455
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
58,796✔
456
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
58,796✔
457

458
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
58,796✔
459
    // change tag cid to new cid
460
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
58,796✔
461
      STableMeta* pTableMeta = NULL;
49,418✔
462
      SName       sName = {0};
49,418✔
463
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
49,418✔
464
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
465
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
49,418✔
466
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
49,418✔
467
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
49,418✔
UNCOV
468
        uInfo(LOG_ID_TAG " super table %s not exist, ignore create child table %s", LOG_ID_VALUE,
×
469
              pCreateReq->ctb.stbName, pCreateReq->name);
UNCOV
470
        code = TSDB_CODE_SUCCESS;
×
UNCOV
471
        taosMemoryFreeClear(pTableMeta);
×
UNCOV
472
        continue;
×
473
      }
474

475
      RAW_RETURN_CHECK(code);
49,418✔
476
      pCreateReq->ctb.suid = pTableMeta->uid;
49,418✔
477

478
      bool changeDB = strlen(tmqWriteRefDB) > 0;
49,418✔
479
      for (int32_t i = 0; changeDB && i < pCreateReq->colRef.nCols; i++) {
64,634✔
480
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
15,216✔
481
        tstrncpy(pColRef->refDbName, tmqWriteRefDB, TSDB_DB_NAME_LEN);
15,216✔
482
      }
483

484
      for (int32_t i = 0; tmqWriteCheckRef && i < pCreateReq->colRef.nCols && i < pTableMeta->tableInfo.numOfColumns; i++) {
61,578✔
485
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
12,160✔
486
        if (!pColRef || !pColRef->hasRef) continue;
12,160✔
487
        SSchema* pSchema = pTableMeta->schema + i;
6,080✔
488
        const SSchemaExt* pSchemaExt =
6,080✔
489
            (pTableMeta->schemaExt && i < pTableMeta->tableInfo.numOfColumns) ? pTableMeta->schemaExt + i : NULL;
6,080✔
490
        RAW_RETURN_CHECK(
6,080✔
491
            checkColRefForCreate(pCatalog, &conn, pColRef, pTscObj->acctId, pTableMeta->tableInfo.precision, pSchema,
492
                                 pSchemaExt));
493
      }
494
      
495
      SArray* pTagVals = NULL;
49,418✔
496
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
49,418✔
497
      if (code != TSDB_CODE_SUCCESS) {
49,418✔
UNCOV
498
        uError("create tb invalid tag data %s", pCreateReq->name);
×
UNCOV
499
        taosMemoryFreeClear(pTableMeta);
×
500
        goto end;
×
501
      }
502

503
      bool rebuildTag = false;
49,418✔
504
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
146,722✔
505
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
97,304✔
506
        if (tName == NULL) {
97,304✔
UNCOV
507
          continue;
×
508
        }
509
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
97,304✔
510
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
368,845✔
511
          SSchema* tag = &pTableMeta->schema[j];
271,541✔
512
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
271,541✔
513
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
92,963✔
514
            if (pTagVal) {
92,963✔
515
              if (pTagVal->cid != tag->colId) {
92,963✔
516
                pTagVal->cid = tag->colId;
6,508✔
517
                rebuildTag = true;
6,508✔
518
              }
519
            } else {
UNCOV
520
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
521
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
522
            }
523
          }
524
        }
525
      }
526
      taosMemoryFreeClear(pTableMeta);
49,418✔
527
      if (rebuildTag) {
49,418✔
528
        STag* ppTag = NULL;
4,050✔
529
        code = tTagNew(pTagVals, 1, false, &ppTag);
4,050✔
530
        taosArrayDestroy(pTagVals);
4,050✔
531
        pTagVals = NULL;
4,050✔
532
        if (code != TSDB_CODE_SUCCESS) {
4,050✔
UNCOV
533
          goto end;
×
534
        }
535
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
4,050✔
536
          code = terrno;
×
UNCOV
537
          tTagFree(ppTag);
×
UNCOV
538
          goto end;
×
539
        }
540
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
4,050✔
541
      }
542
      taosArrayDestroy(pTagVals);
49,418✔
543
    }
544
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
117,592✔
545

546
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
58,796✔
547
    if (pTableBatch == NULL) {
58,796✔
548
      SVgroupCreateTableBatch tBatch = {0};
55,928✔
549
      tBatch.info = pInfo;
55,928✔
550
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
55,928✔
551

552
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
55,928✔
553
      RAW_NULL_CHECK(tBatch.req.pArray);
55,928✔
554
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
111,856✔
555
      tBatch.req.source = TD_REQ_FROM_TAOX;
55,928✔
556
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
55,928✔
557
    } else {  // add to the correct vgroup
558
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
5,736✔
559
    }
560
  }
561

562
  if (taosHashGetSize(pVgroupHashmap) == 0) {
55,310✔
UNCOV
563
    goto end;
×
564
  }
565
  SArray* pBufArray = NULL;
55,310✔
566
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
55,310✔
567
  pQuery = NULL;
55,310✔
568
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
55,310✔
569
  if (TSDB_CODE_SUCCESS != code) goto end;
55,310✔
570
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
55,310✔
571
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
55,310✔
572
  pQuery->stableQuery = false;
55,310✔
573
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
55,310✔
574
  if (TSDB_CODE_SUCCESS != code) goto end;
55,310✔
575
  RAW_NULL_CHECK(pQuery->pRoot);
55,310✔
576

577
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
55,310✔
578

579
  launchQueryImpl(pRequest, pQuery, true, NULL);
55,310✔
580
  if (pRequest->code == TSDB_CODE_SUCCESS) {
55,310✔
581
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
55,310✔
582
  }
583

584
  code = pRequest->code;
55,310✔
585
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
55,310✔
586

587
end:
55,310✔
588
  tDeleteSVCreateTbBatchReq(&req);
55,310✔
589

590
  taosHashCleanup(pVgroupHashmap);
55,310✔
591
  destroyRequest(pRequest);
55,310✔
592
  tDecoderClear(&coder);
55,310✔
593
  qDestroyQuery(pQuery);
55,310✔
594
  taosArrayDestroyP(pTagList, NULL);
55,310✔
595
  RAW_LOG_END
55,310✔
596
  return code;
55,310✔
597
}
598

599
typedef struct SVgroupDropTableBatch {
600
  SVDropTbBatchReq req;
601
  SVgroupInfo      info;
602
  char             dbName[TSDB_DB_NAME_LEN];
603
} SVgroupDropTableBatch;
604

605
static void destroyDropTbReqBatch(void* data) {
1,593✔
606
  if (data == NULL) {
1,593✔
UNCOV
607
    uError("invalid parameter in %s", __func__);
×
UNCOV
608
    return;
×
609
  }
610
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
1,593✔
611
  taosArrayDestroy(pTbBatch->req.pArray);
1,593✔
612
}
613

614
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
2,207✔
615
  if (taos == NULL || meta == NULL) {
2,207✔
UNCOV
616
    uError("invalid parameter in %s", __func__);
×
UNCOV
617
    return TSDB_CODE_INVALID_PARA;
×
618
  }
619
  SVDropTbBatchReq req = {0};
2,207✔
620
  SDecoder         coder = {0};
2,207✔
621
  int32_t          code = TSDB_CODE_SUCCESS;
2,207✔
622
  int32_t          lino = 0;
2,207✔
623
  SRequestObj*     pRequest = NULL;
2,207✔
624
  SQuery*          pQuery = NULL;
2,207✔
625
  SHashObj*        pVgroupHashmap = NULL;
2,207✔
626

627
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
2,207✔
628
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
2,207✔
629

630
  pRequest->syncQuery = true;
2,207✔
631
  if (!pRequest->pDb) {
2,207✔
UNCOV
632
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
633
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
634
    goto end;
×
635
  }
636
  // decode and process req
637
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
2,207✔
638
  uint32_t len = metaLen - sizeof(SMsgHead);
2,207✔
639
  tDecoderInit(&coder, data, len);
2,207✔
640
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&coder, &req));
2,207✔
641
  STscObj* pTscObj = pRequest->pTscObj;
2,207✔
642

643
  SVDropTbReq* pDropReq = NULL;
2,207✔
644
  SCatalog*    pCatalog = NULL;
2,207✔
645
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
2,207✔
646

647
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
2,207✔
648
  RAW_NULL_CHECK(pVgroupHashmap);
2,207✔
649
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
2,207✔
650

651
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2,207✔
652
                           .requestId = pRequest->requestId,
2,207✔
653
                           .requestObjRefId = pRequest->self,
2,207✔
654
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
2,207✔
655
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
2,207✔
656
  RAW_NULL_CHECK(pRequest->tableList);
2,207✔
657
  // loop to create table
658
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
5,057✔
659
    pDropReq = req.pReqs + iReq;
2,850✔
660
    pDropReq->igNotExists = true;
2,850✔
661
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
662

663
    SVgroupInfo pInfo = {0};
2,850✔
664
    SName       pName = {0};
2,850✔
665
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
2,850✔
666
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
2,850✔
667
    PROCESS_TABLE_NOT_EXIST(code, pDropReq->name)
2,850✔
668
    STableMeta* pTableMeta = NULL;
2,850✔
669
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
2,850✔
670
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
2,850✔
671
      code = TSDB_CODE_SUCCESS;
614✔
672
      uInfo(LOG_ID_TAG " table %s not exist, ignore drop", LOG_ID_VALUE, pDropReq->name);
614✔
673
      taosMemoryFreeClear(pTableMeta);
614✔
674
      continue;
614✔
675
    }
676
    RAW_RETURN_CHECK(code);
2,236✔
677
    tb_uid_t oldSuid = pDropReq->suid;
2,236✔
678
    pDropReq->suid = pTableMeta->suid;
2,236✔
679
    taosMemoryFreeClear(pTableMeta);
2,236✔
680
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
2,236✔
681
           pDropReq->suid);
682

683
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
4,472✔
684
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
2,236✔
685
    if (pTableBatch == NULL) {
2,236✔
686
      SVgroupDropTableBatch tBatch = {0};
1,593✔
687
      tBatch.info = pInfo;
1,593✔
688
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
1,593✔
689
      RAW_NULL_CHECK(tBatch.req.pArray);
1,593✔
690
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
3,186✔
691
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
1,593✔
692
    } else {  // add to the correct vgroup
693
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
1,286✔
694
    }
695
  }
696

697
  if (taosHashGetSize(pVgroupHashmap) == 0) {
2,207✔
698
    goto end;
614✔
699
  }
700
  SArray* pBufArray = NULL;
1,593✔
701
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
1,593✔
702
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
1,593✔
703
  if (TSDB_CODE_SUCCESS != code) goto end;
1,593✔
704
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1,593✔
705
  pQuery->msgType = TDMT_VND_DROP_TABLE;
1,593✔
706
  pQuery->stableQuery = false;
1,593✔
707
  pQuery->pRoot = NULL;
1,593✔
708
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
1,593✔
709
  if (TSDB_CODE_SUCCESS != code) goto end;
1,593✔
710
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
1,593✔
711

712
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,593✔
713
  if (pRequest->code == TSDB_CODE_SUCCESS) {
1,593✔
714
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
1,593✔
715
  }
716
  code = pRequest->code;
1,593✔
717
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,593✔
718

719
end:
2,207✔
720
  taosHashCleanup(pVgroupHashmap);
2,207✔
721
  destroyRequest(pRequest);
2,207✔
722
  tDecoderClear(&coder);
2,207✔
723
  qDestroyQuery(pQuery);
2,207✔
724
  RAW_LOG_END
2,207✔
725
  return code;
2,207✔
726
}
727

728
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
934✔
729
  if (taos == NULL || meta == NULL) {
934✔
UNCOV
730
    uError("invalid parameter in %s", __func__);
×
UNCOV
731
    return TSDB_CODE_INVALID_PARA;
×
732
  }
733
  SDeleteRes req = {0};
934✔
734
  SDecoder   coder = {0};
934✔
735
  char       sql[256] = {0};
934✔
736
  int32_t    code = TSDB_CODE_SUCCESS;
934✔
737
  int32_t    lino = 0;
934✔
738
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
934✔
739

740
  // decode and process req
741
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
934✔
742
  uint32_t len = metaLen - sizeof(SMsgHead);
934✔
743
  tDecoderInit(&coder, data, len);
934✔
744
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
934✔
745
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
934✔
746
                 req.tsColName, req.skey, req.tsColName, req.ekey);
747

748
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
934✔
749
  RAW_NULL_CHECK(res);
934✔
750
  SRequestObj* pRequest = (SRequestObj*)res;
934✔
751
  code = pRequest->code;
934✔
752
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
934✔
753
    code = TSDB_CODE_SUCCESS;
307✔
754
  }
755
  taos_free_result(res);
934✔
756
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
934✔
757

758
end:
934✔
759
  RAW_LOG_END
934✔
760
  tDecoderClear(&coder);
934✔
761
  return code;
934✔
762
}
763

764
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
15,611✔
765
  if (taos == NULL || meta == NULL) {
15,611✔
UNCOV
766
    uError("invalid parameter in %s", __func__);
×
UNCOV
767
    return TSDB_CODE_INVALID_PARA;
×
768
  }
769
  SVAlterTbReq   req = {0};
15,611✔
770
  SDecoder       dcoder = {0};
15,611✔
771
  int32_t        code = TSDB_CODE_SUCCESS;
15,611✔
772
  int32_t        lino = 0;
15,611✔
773
  SRequestObj*   pRequest = NULL;
15,611✔
774
  SQuery*        pQuery = NULL;
15,611✔
775
  SArray*        pArray = NULL;
15,611✔
776
  SVgDataBlocks* pVgData = NULL;
15,611✔
777
  SArray*        pVgList = NULL;
15,611✔
778
  SEncoder       coder = {0};
15,611✔
779
  SHashObj*      pVgroupHashmap = NULL;
15,611✔
780

781
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
15,611✔
782
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
15,611✔
783
  pRequest->syncQuery = true;
15,611✔
784
  if (!pRequest->pDb) {
15,611✔
UNCOV
785
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
786
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
787
    goto end;
×
788
  }
789
  // decode and process req
790
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
15,611✔
791
  uint32_t len = metaLen - sizeof(SMsgHead);
15,611✔
792
  tDecoderInit(&dcoder, data, len);
15,611✔
793
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&dcoder, &req));
15,611✔
794
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
795
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
15,611✔
796
    uInfo(LOG_ID_TAG " alter table action is UPDATE_OPTIONS, ignore", LOG_ID_VALUE);
1,254✔
797
    goto end;
1,254✔
798
  }
799

800
  STscObj*  pTscObj = pRequest->pTscObj;
14,357✔
801
  SCatalog* pCatalog = NULL;
14,357✔
802
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
14,357✔
803
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
14,357✔
804
                           .requestId = pRequest->requestId,
14,357✔
805
                           .requestObjRefId = pRequest->self,
14,357✔
806
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
14,357✔
807

808
  // Handle Type 1 batch modification with vnode grouping
809
  if (req.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
14,357✔
810
    if (req.tables == NULL || taosArrayGetSize(req.tables) == 0) {
4,560✔
UNCOV
811
      uError(LOG_ID_TAG " Type 1 batch alter has empty tables array", LOG_ID_VALUE);
×
UNCOV
812
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
813
      goto end;
×
814
    }
815

816
    int32_t nTables = taosArrayGetSize(req.tables);
4,560✔
817
    uDebug(LOG_ID_TAG " Type 1 batch alter with %d tables, grouping by vnode", LOG_ID_VALUE, nTables);
4,560✔
818

819
    // Create hashmap to group tables by vgId
820
    pVgroupHashmap = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
4,560✔
821
    RAW_NULL_CHECK(pVgroupHashmap);
4,560✔
822

823
    // Group tables by vnode
824
    for (int32_t i = 0; i < nTables; i++) {
9,880✔
825
      SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
5,320✔
826
      if (pTable == NULL || pTable->tbName == NULL) {
5,320✔
UNCOV
827
        uWarn(LOG_ID_TAG " Type 1 batch alter table[%d] has invalid name, skip", LOG_ID_VALUE, i);
×
UNCOV
828
        continue;
×
829
      }
830

831
      // Query vnode for this table
832
      SVgroupInfo vgInfo = {0};
5,320✔
833
      SName pName = {0};
5,320✔
834
      toName(pTscObj->acctId, pRequest->pDb, pTable->tbName, &pName);
5,320✔
835
      code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgInfo);
5,320✔
836
      PROCESS_TABLE_NOT_EXIST(code, pTable->tbName)
5,320✔
837

838
      // Add table to corresponding vnode's array
839
      SArray** ppTables = taosHashGet(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t));
5,320✔
840
      if (ppTables == NULL) {
5,320✔
841
        SArray* pTables = taosArrayInit(16, sizeof(SUpdateTableTagVal));
4,940✔
842
        RAW_NULL_CHECK(pTables);
4,940✔
843
        RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t), &pTables, sizeof(void*)));
4,940✔
844
        ppTables = taosHashGet(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t));
4,940✔
845
      }
846

847
      SArray* pTables = *ppTables;
5,320✔
848
      RAW_NULL_CHECK(taosArrayPush(pTables, pTable));
5,320✔
849
    }
850

851
    // Build and send separate request for each vnode
852
    pArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
4,560✔
853
    RAW_NULL_CHECK(pArray);
4,560✔
854

855
    void* pIter = taosHashIterate(pVgroupHashmap, NULL);
4,560✔
856
    while (pIter) {
9,500✔
857
      size_t keyLen = 0;
4,940✔
858
      int32_t* pVgId = taosHashGetKey(pIter, &keyLen);
4,940✔
859
      SArray* pTables = *(SArray**)pIter;
4,940✔
860
      int32_t nTablesInVg = taosArrayGetSize(pTables);
4,940✔
861

862
      uDebug(LOG_ID_TAG " Type 1 batch alter: vgId:%d has %d tables", LOG_ID_VALUE, *pVgId, nTablesInVg);
4,940✔
863

864
      // Build SVAlterTbReq for this vnode
865
      SVAlterTbReq vgReq = {0};
4,940✔
866
      vgReq.action = req.action;
4,940✔
867
      vgReq.tbName = req.tbName;
4,940✔
868
      vgReq.source = TD_REQ_FROM_TAOX;
4,940✔
869
      vgReq.tables = pTables;
4,940✔
870

871
      // Encode request
872
      int tlen = 0;
4,940✔
873
      tEncodeSize(tEncodeSVAlterTbReq, &vgReq, tlen, code);
4,940✔
874
      if (code < 0) {
4,940✔
UNCOV
875
        uError(LOG_ID_TAG " Type 1 batch alter encode failed for vgId:%d, code:%s",
×
876
               LOG_ID_VALUE, *pVgId, tstrerror(code));
UNCOV
877
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
878
        goto end;
×
879
      }
880

881
      tlen += sizeof(SMsgHead);
4,940✔
882
      void* pMsg = taosMemoryMalloc(tlen);
4,940✔
883
      if (pMsg == NULL) {
4,940✔
UNCOV
884
        code = terrno;
×
UNCOV
885
        uError(LOG_ID_TAG " Type 1 batch alter malloc failed for vgId:%d, size:%d",
×
886
               LOG_ID_VALUE, *pVgId, tlen);
UNCOV
887
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
888
        goto end;
×
889
      }
890

891
      ((SMsgHead*)pMsg)->vgId = htonl(*pVgId);
4,940✔
892
      ((SMsgHead*)pMsg)->contLen = htonl(tlen);
4,940✔
893
      void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
4,940✔
894

895
      SEncoder vgCoder = {0};
4,940✔
896
      tEncoderInit(&vgCoder, pBuf, tlen - sizeof(SMsgHead));
4,940✔
897
      code = tEncodeSVAlterTbReq(&vgCoder, &vgReq);
4,940✔
898
      tEncoderClear(&vgCoder);
4,940✔
899

900
      if (code < 0) {
4,940✔
901
        uError(LOG_ID_TAG " Type 1 batch alter encode2 failed for vgId:%d, code:%s",
×
902
               LOG_ID_VALUE, *pVgId, tstrerror(code));
UNCOV
903
        taosMemoryFree(pMsg);
×
UNCOV
904
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
905
        goto end;
×
906
      }
907

908
      // Query vgroup info for first table to get endpoint
909
      SUpdateTableTagVal* pFirstTable = taosArrayGet(pTables, 0);
4,940✔
910
      SVgroupInfo vgInfo = {0};
4,940✔
911
      SName pName = {0};
4,940✔
912
      toName(pTscObj->acctId, pRequest->pDb, pFirstTable->tbName, &pName);
4,940✔
913
      code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgInfo);
4,940✔
914
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
4,940✔
UNCOV
915
        taosMemoryFree(pMsg);
×
UNCOV
916
        pIter = taosHashIterate(pVgroupHashmap, pIter);
×
UNCOV
917
        code = TSDB_CODE_SUCCESS;
×
UNCOV
918
        continue;
×
919
      }
920
      if (code != TSDB_CODE_SUCCESS) {
4,940✔
UNCOV
921
        taosMemoryFree(pMsg);
×
UNCOV
922
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
923
        goto end;
×
924
      }
925

926
      // Create VgDataBlocks for this vnode
927
      pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
4,940✔
928
      if (pVgData == NULL) {
4,940✔
UNCOV
929
        code = terrno;
×
930
        taosMemoryFree(pMsg);
×
931
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
932
        goto end;
×
933
      }
934
      pVgData->vg = vgInfo;
4,940✔
935
      pVgData->pData = pMsg;
4,940✔
936
      pVgData->size = tlen;
4,940✔
937
      pVgData->numOfTables = nTablesInVg;
4,940✔
938

939
      if (taosArrayPush(pArray, &pVgData) == NULL) {
4,940✔
UNCOV
940
        code = terrno;
×
UNCOV
941
        taosMemoryFree(pMsg);
×
UNCOV
942
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
943
        goto end;
×
944
      }
945

946
      pVgData = NULL;  // Ownership transferred to pArray
4,940✔
947
      pIter = taosHashIterate(pVgroupHashmap, pIter);
4,940✔
948
    }
949

950
    uInfo(LOG_ID_TAG " Type 1 batch alter: grouped %d tables into %d vnodes",
4,560✔
951
          LOG_ID_VALUE, nTables, (int32_t)taosArrayGetSize(pArray));
952
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
9,797✔
953
    char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,729✔
954
    SName pName = {TSDB_TABLE_NAME_T, pTscObj->acctId, {0}, {0}};
1,729✔
955
    tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
1,729✔
956
    (void)tNameGetFullDbName(&pName, dbFName);
1,729✔
957
    RAW_RETURN_CHECK(catalogGetDBVgList(pCatalog, &conn, dbFName, &pVgList));
1,729✔
958

959
    pArray = taosArrayInit(taosArrayGetSize(pVgList), sizeof(void*));
1,729✔
960
    RAW_NULL_CHECK(pArray);
1,729✔
961
    for (int i = 0; i < taosArrayGetSize(pVgList); ++i) {
4,218✔
962
      SVgroupInfo* pInfo = (SVgroupInfo*)taosArrayGet(pVgList, i);
2,489✔
963
      pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
2,489✔
964
      RAW_NULL_CHECK(pVgData);
2,489✔
965
      pVgData->vg = *pInfo;
2,489✔
966

967
      int tlen = 0;
2,489✔
968
      req.source = TD_REQ_FROM_TAOX;
2,489✔
969

970
      tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
2,489✔
971
      RAW_RETURN_CHECK(code);
2,489✔
972
      tlen += sizeof(SMsgHead);
2,489✔
973
      void* pMsg = taosMemoryMalloc(tlen);
2,489✔
974
      RAW_NULL_CHECK(pMsg);
2,489✔
975
      ((SMsgHead*)pMsg)->vgId = htonl(pInfo->vgId);
2,489✔
976
      ((SMsgHead*)pMsg)->contLen = htonl(tlen);
2,489✔
977
      void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
2,489✔
978
      tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
2,489✔
979
      RAW_RETURN_CHECK(tEncodeSVAlterTbReq(&coder, &req));
2,489✔
980
      tEncoderClear(&coder);
2,489✔
981

982
      pVgData->pData = pMsg;
2,489✔
983
      pVgData->size = tlen;
2,489✔
984

985
      pVgData->numOfTables = 1;
2,489✔
986
      RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
2,489✔
987
      pVgData = NULL;  // Ownership transferred to pArray
2,489✔
988
    }
989
  } else {
990
    // Single table or Type 2 modification - original logic
991
    SVgroupInfo pInfo = {0};
8,068✔
992
    SName       pName = {0};
8,068✔
993
    toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
8,068✔
994
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
8,068✔
995
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
8,068✔
UNCOV
996
      code = TSDB_CODE_SUCCESS;
×
UNCOV
997
      goto end;
×
998
    }
999
    RAW_RETURN_CHECK(code);
8,068✔
1000
    pArray = taosArrayInit(1, sizeof(void*));
8,068✔
1001
    RAW_NULL_CHECK(pArray);
8,068✔
1002

1003
    pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
8,068✔
1004
    RAW_NULL_CHECK(pVgData);
8,068✔
1005
    pVgData->vg = pInfo;
8,068✔
1006

1007
    int tlen = 0;
8,068✔
1008
    req.source = TD_REQ_FROM_TAOX;
8,068✔
1009

1010
    if (strlen(tmqWriteRefDB) > 0) {
8,068✔
1011
      req.refDbName = tmqWriteRefDB;
3,052✔
1012
    }
1013

1014
    if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF && tmqWriteCheckRef) {
8,068✔
1015
      RAW_RETURN_CHECK(checkColRefForAlter(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName,
380✔
1016
        pRequest->pDb, req.tbName, req.colName));
1017
    }else if (req.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF && tmqWriteCheckRef) {
7,688✔
1018
      RAW_RETURN_CHECK(checkColRefForAdd(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName,
×
1019
        pRequest->pDb, req.tbName, req.colName, req.type, req.bytes, req.typeMod));
1020
    }
1021

1022
    tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
8,068✔
1023
    RAW_RETURN_CHECK(code);
8,068✔
1024
    tlen += sizeof(SMsgHead);
8,068✔
1025
    void* pMsg = taosMemoryMalloc(tlen);
8,068✔
1026
    RAW_NULL_CHECK(pMsg);
8,068✔
1027
    ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
8,068✔
1028
    ((SMsgHead*)pMsg)->contLen = htonl(tlen);
8,068✔
1029
    void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
8,068✔
1030
    tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
8,068✔
1031
    RAW_RETURN_CHECK(tEncodeSVAlterTbReq(&coder, &req));
8,068✔
1032

1033
    pVgData->pData = pMsg;
8,068✔
1034
    pVgData->size = tlen;
8,068✔
1035

1036
    pVgData->numOfTables = 1;
8,068✔
1037
    RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
8,068✔
1038
    pVgData = NULL;
8,068✔
1039
  }
1040

1041
  pQuery = NULL;
14,357✔
1042
  RAW_RETURN_CHECK(nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery));
14,357✔
1043
  if (NULL == pQuery) goto end;
14,357✔
1044
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
14,357✔
1045
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
14,357✔
1046
  pQuery->stableQuery = false;
14,357✔
1047
  pQuery->pRoot = NULL;
14,357✔
1048
  RAW_RETURN_CHECK(nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot));
14,357✔
1049
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
14,357✔
1050

1051
  launchQueryImpl(pRequest, pQuery, true, NULL);
14,357✔
1052
  pArray = NULL;
14,357✔
1053

1054
  code = pRequest->code;
14,357✔
1055
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST || code == TSDB_CODE_NOT_FOUND) {
14,357✔
1056
    code = TSDB_CODE_SUCCESS;
1,010✔
1057
  }
1058

1059
  if (pRequest->code == TSDB_CODE_SUCCESS) {
14,357✔
1060
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
13,347✔
1061
    if (pRes->res != NULL) {
13,347✔
1062
      code = handleAlterTbExecRes(pRes->res, pCatalog);
8,068✔
1063
    }
1064
  }
1065
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
14,357✔
1066

1067
end:
15,611✔
1068
  // Cleanup vnode grouping hashmap
1069
  if (pVgroupHashmap != NULL) {
15,611✔
1070
    void* pIter = taosHashIterate(pVgroupHashmap, NULL);
4,560✔
1071
    while (pIter) {
9,500✔
1072
      SArray* pTables = *(SArray**)pIter;
4,940✔
1073
      taosArrayDestroy(pTables);
4,940✔
1074
      pIter = taosHashIterate(pVgroupHashmap, pIter);
4,940✔
1075
    }
1076
    taosHashCleanup(pVgroupHashmap);
4,560✔
1077
  }
1078

1079
  for (int i = 0; i < taosArrayGetSize(pArray); ++i) {
15,611✔
UNCOV
1080
    SVgDataBlocks* pData = (SVgDataBlocks*)taosArrayGetP(pArray, i);
×
UNCOV
1081
    if (pData && pData->pData) {
×
UNCOV
1082
      taosMemoryFreeClear(pData->pData);
×
UNCOV
1083
      taosMemoryFreeClear(pData);
×
1084
    }
1085
  }
1086
  taosArrayDestroy(pArray);
15,611✔
1087
  
1088
  if (pVgData) {
15,611✔
UNCOV
1089
    taosMemoryFreeClear(pVgData->pData);
×
UNCOV
1090
    taosMemoryFreeClear(pVgData);
×
1091
  }
1092
  taosArrayDestroy(pVgList);
15,611✔
1093
  destroyRequest(pRequest);
15,611✔
1094
  tDecoderClear(&dcoder);
15,611✔
1095
  qDestroyQuery(pQuery);
15,611✔
1096
  destroyAlterTbReq(&req);
15,611✔
1097
  tEncoderClear(&coder);
15,611✔
1098
  RAW_LOG_END
15,611✔
1099
  return code;
15,611✔
1100
}
1101

1102
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
307✔
1103
                                     int numFields) {
1104
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
307✔
1105
}
1106

1107
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
2,456✔
1108
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1109
  if (taos == NULL || pData == NULL || tbname == NULL) {
2,456✔
1110
    uError("invalid parameter in %s, taos:%p, pData:%p, tbname:%p", __func__, taos, pData, tbname);
×
1111
    return TSDB_CODE_INVALID_PARA;
×
1112
  }
1113
  int32_t     code = TSDB_CODE_SUCCESS;
2,456✔
1114
  int32_t     lino = 0;
2,456✔
1115
  STableMeta* pTableMeta = NULL;
2,456✔
1116
  SQuery*     pQuery = NULL;
2,456✔
1117
  SHashObj*   pVgHash = NULL;
2,456✔
1118

1119
  SRequestObj* pRequest = NULL;
2,456✔
1120
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
2,456✔
1121

1122
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
2,456✔
1123
         rows, pData, tbname, fields, numFields);
1124

1125
  pRequest->syncQuery = true;
2,456✔
1126
  if (!pRequest->pDb) {
2,456✔
1127
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1128
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1129
    goto end;
×
1130
  }
1131

1132
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
2,456✔
1133
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
2,456✔
1134
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
2,456✔
1135

1136
  struct SCatalog* pCatalog = NULL;
2,456✔
1137
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,456✔
1138

1139
  SRequestConnInfo conn = {0};
2,456✔
1140
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
2,456✔
1141
  conn.requestId = pRequest->requestId;
2,456✔
1142
  conn.requestObjRefId = pRequest->self;
2,456✔
1143
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
2,456✔
1144

1145
  SVgroupInfo vgData = {0};
2,456✔
1146
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
2,456✔
1147
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
2,456✔
1148
  uDebug(LOG_ID_TAG " write raw block got meta, tbname:%s, numOfColumns:%d", LOG_ID_VALUE,
2,149✔
1149
         tbname, pTableMeta->tableInfo.numOfColumns);
1150
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
2,149✔
1151
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
2,149✔
1152
  RAW_NULL_CHECK(pVgHash);
2,149✔
1153
  RAW_RETURN_CHECK(
2,149✔
1154
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1155
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
2,149✔
1156
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,535✔
1157

1158
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,535✔
1159
  code = pRequest->code;
1,535✔
1160
  uDebug(LOG_ID_TAG " write raw block return, tbname:%s, msg:%s", LOG_ID_VALUE, tbname, tstrerror(code));
1,535✔
1161

1162
end:
2,456✔
1163
  taosMemoryFreeClear(pTableMeta);
2,456✔
1164
  qDestroyQuery(pQuery);
2,456✔
1165
  destroyRequest(pRequest);
2,456✔
1166
  taosHashCleanup(pVgHash);
2,456✔
1167
  RAW_LOG_END
2,456✔
1168
  return code;
2,456✔
1169
}
1170

1171
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
2,149✔
1172
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, NULL, 0, 0);
2,149✔
1173
}
1174

UNCOV
1175
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
×
UNCOV
1176
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, NULL, 0, reqid);
×
1177
}
1178

1179
static void* getRawDataFromRes(void* pRetrieve) {
53,887✔
1180
  if (pRetrieve == NULL) {
53,887✔
UNCOV
1181
    uError("invalid parameter in %s", __func__);
×
UNCOV
1182
    return NULL;
×
1183
  }
1184
  void* rawData = NULL;
53,887✔
1185
  // deal with compatibility
1186
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
53,887✔
UNCOV
1187
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1188
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
53,887✔
UNCOV
1189
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
1190
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
53,887✔
1191
  }
1192
  return rawData;
53,887✔
1193
}
1194

1195
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
3,198✔
1196
  if (rsp == NULL || pHashObj == NULL) {
3,198✔
UNCOV
1197
    uError("invalid parameter in %s", __func__);
×
UNCOV
1198
    return TSDB_CODE_INVALID_PARA;
×
1199
  }
1200
  // find schema data info
1201
  int32_t       code = 0;
3,198✔
1202
  int32_t       lino = 0;
3,198✔
1203
  SVCreateTbReq pCreateReq = {0};
3,198✔
1204
  SDecoder      decoderTmp = {0};
3,198✔
1205
  RAW_LOG_START
3,198✔
1206
  for (int j = 0; j < rsp->createTableNum; j++) {
8,344✔
1207
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
5,146✔
1208
    RAW_NULL_CHECK(dataTmp);
5,146✔
1209
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
5,146✔
1210
    RAW_NULL_CHECK(lenTmp);
5,146✔
1211

1212
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
5,146✔
1213
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
5,146✔
1214

1215
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
5,146✔
UNCOV
1216
      uError("invalid table type %d in %s", pCreateReq.type, __func__);
×
UNCOV
1217
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1218
      goto end;
×
1219
    }
1220
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
5,146✔
1221
      RAW_RETURN_CHECK(
5,146✔
1222
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1223
    } else {
UNCOV
1224
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1225
    }
1226

1227
    tDecoderClear(&decoderTmp);
5,146✔
1228
    pCreateReq = (SVCreateTbReq){0};
5,146✔
1229
  }
1230

1231
end:
3,198✔
1232
  tDecoderClear(&decoderTmp);
3,198✔
1233
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
3,198✔
1234
  RAW_LOG_END
3,198✔
1235
  return code;
3,198✔
1236
}
1237

1238
typedef enum {
1239
  WRITE_RAW_INIT_START = 0,
1240
  WRITE_RAW_INIT_OK,
1241
  WRITE_RAW_INIT_FAIL,
1242
} WRITE_RAW_INIT_STATUS;
1243

1244
static SHashObj* writeRawCache = NULL;
1245
static int8_t    initFlag = 0;
1246
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1247

1248
typedef struct {
1249
  SHashObj* pVgHash;
1250
  SHashObj* pNameHash;
1251
  SHashObj* pMetaHash;
1252
} rawCacheInfo;
1253

1254
typedef struct {
1255
  SVgroupInfo vgInfo;
1256
  int64_t     uid;
1257
  int64_t     suid;
1258
} tbInfo;
1259

1260
static void tmqFreeMeta(void* data) {
14,758✔
1261
  if (data == NULL) {
14,758✔
1262
    uError("invalid parameter in %s", __func__);
×
1263
    return;
×
1264
  }
1265
  STableMeta* pTableMeta = *(STableMeta**)data;
14,758✔
1266
  taosMemoryFree(pTableMeta);
14,758✔
1267
}
1268

UNCOV
1269
static void freeRawCache(void* data) {
×
UNCOV
1270
  if (data == NULL) {
×
UNCOV
1271
    uError("invalid parameter in %s", __func__);
×
UNCOV
1272
    return;
×
1273
  }
UNCOV
1274
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1275
  taosHashCleanup(pRawCache->pMetaHash);
×
1276
  taosHashCleanup(pRawCache->pNameHash);
×
UNCOV
1277
  taosHashCleanup(pRawCache->pVgHash);
×
1278
}
1279

1280
static int32_t initRawCacheHash() {
7,383✔
1281
  if (writeRawCache == NULL) {
7,383✔
1282
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
7,383✔
1283
    if (writeRawCache == NULL) {
7,383✔
UNCOV
1284
      return terrno;
×
1285
    }
1286
    taosHashSetFreeFp(writeRawCache, freeRawCache);
7,383✔
1287
  }
1288
  return 0;
7,383✔
1289
}
1290

1291
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
4,047✔
1292
  if (rawData == NULL || pSW == NULL) {
4,047✔
UNCOV
1293
    return false;
×
1294
  }
1295
  if (pTableMeta == NULL) {
4,047✔
UNCOV
1296
    uError("invalid parameter in %s", __func__);
×
UNCOV
1297
    return false;
×
1298
  }
1299
  char* p = (char*)rawData;
4,047✔
1300
  // Raw block header layout:
1301
  // | version(int32) | totalLen(int32) | totalRows(int32) | blankFill(int32) | totalCols(int32) | flagSeg(uint64) | columnSchema... | colLen... |
1302
  int32_t rawBlockHeaderSize = sizeof(int32_t) * 5 + sizeof(uint64_t);
4,047✔
1303
  p += rawBlockHeaderSize;
4,047✔
1304
  int8_t* fields = (int8_t*)p;
4,047✔
1305

1306
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
4,047✔
1307
    return true;
1,552✔
1308
  }
1309

1310
  for (int i = 0; i < pSW->nCols; i++) {
13,102✔
1311
    int j = 0;
10,607✔
1312
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
28,085✔
1313
      SSchema*    pColSchema = &pTableMeta->schema[j];
28,085✔
1314
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
28,085✔
1315
      char*       fieldName = pSW->pSchema[i].name;
28,085✔
1316

1317
      if (strcmp(pColSchema->name, fieldName) == 0) {
28,085✔
1318
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
10,607✔
1319
          return true;
×
1320
        }
1321
        break;
10,607✔
1322
      }
1323
    }
1324
    fields += sizeof(int8_t) + sizeof(int32_t);
10,607✔
1325

1326
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
10,607✔
1327
  }
1328
  return false;
2,495✔
1329
}
1330

1331
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
23,850✔
1332
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
23,850✔
1333
    uError("invalid parameter in %s", __func__);
×
UNCOV
1334
    return TSDB_CODE_INVALID_PARA;
×
1335
  }
1336
  int32_t code = 0;
23,850✔
1337
  int32_t lino = 0;
23,850✔
1338
  RAW_LOG_START
23,850✔
1339
  void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
23,850✔
1340
  if (cacheInfo == NULL) {
23,850✔
1341
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
23,850✔
1342
    RAW_NULL_CHECK(*pVgHash);
23,850✔
1343
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
23,850✔
1344
    RAW_NULL_CHECK(*pNameHash);
23,850✔
1345
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
23,850✔
1346
    RAW_NULL_CHECK(*pMetaHash);
23,850✔
1347
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
23,850✔
1348
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
23,850✔
1349
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
23,850✔
1350
  } else {
UNCOV
1351
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
UNCOV
1352
    *pVgHash = info->pVgHash;
×
UNCOV
1353
    *pNameHash = info->pNameHash;
×
UNCOV
1354
    *pMetaHash = info->pMetaHash;
×
1355
  }
1356

1357
end:
23,850✔
1358
  if (code != 0) {
23,850✔
1359
    taosHashCleanup(*pMetaHash);
×
1360
    taosHashCleanup(*pNameHash);
×
UNCOV
1361
    taosHashCleanup(*pVgHash);
×
1362
  }
1363
  RAW_LOG_END
23,850✔
1364
  return code;
23,850✔
1365
}
1366

1367
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
23,850✔
1368
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
23,850✔
UNCOV
1369
    uError("invalid parameter in %s", __func__);
×
UNCOV
1370
    return TSDB_CODE_INVALID_PARA;
×
1371
  }
1372
  int32_t code = 0;
23,850✔
1373
  int32_t lino = 0;
23,850✔
1374
  RAW_LOG_START
23,850✔
1375
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
23,850✔
1376
  (*pRequest)->syncQuery = true;
23,850✔
1377
  if (!(*pRequest)->pDb) {
23,850✔
1378
    uError("%s no database selected", __func__);
×
1379
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1380
    goto end;
×
1381
  }
1382

1383
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
23,850✔
1384
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
23,850✔
1385
  conn->requestId = (*pRequest)->requestId;
23,850✔
1386
  conn->requestObjRefId = (*pRequest)->self;
23,850✔
1387
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
23,850✔
1388

1389
end:
23,850✔
1390
  RAW_LOG_END
23,850✔
1391
  return code;
23,850✔
1392
}
1393

1394
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
1395
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
23,850✔
1396
                              SMqRspObj* rspObj) {
1397
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
23,850✔
UNCOV
1398
    uError("invalid parameter in %s", __func__);
×
UNCOV
1399
    return TSDB_CODE_INVALID_PARA;
×
1400
  }
1401
  int8_t dataVersion = *(int8_t*)data;
23,850✔
1402
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
23,850✔
1403
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
23,850✔
1404
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
23,850✔
UNCOV
1405
      return TSDB_CODE_INVALID_PARA;
×
1406
    }
1407
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
23,850✔
1408
  }
1409

1410
  rspObj->resIter = -1;
23,850✔
1411
  tDecoderInit(decoder, data, dataLen);
23,850✔
1412
  int32_t code = func(decoder, &rspObj->dataRsp);
23,850✔
1413
  if (code != 0) {
23,850✔
UNCOV
1414
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
1415
  }
1416
  return code;
23,850✔
1417
}
1418

1419
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
53,887✔
1420
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
1421
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
1422
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
53,887✔
1423
      pMeta == NULL) {
1424
    uError("invalid parameter in %s", __func__);
×
1425
    return TSDB_CODE_INVALID_PARA;
×
1426
  }
1427
  int32_t code = 0;
53,887✔
1428
  int32_t lino = 0;
53,887✔
1429
  RAW_LOG_START
53,887✔
1430
  STableMeta* pTableMeta = NULL;
53,887✔
1431
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
53,887✔
1432
  uDebug("%s tname:%s, cacheHit:%d, retry:%d, hasCreateReq:%d", __func__,
53,887✔
1433
         pName->tname, tmpInfo != NULL, retry, pCreateReqDst != NULL);
1434
  if (tmpInfo == NULL || retry > 0) {
53,887✔
1435
    tbInfo info = {0};
49,840✔
1436

1437
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
49,840✔
1438
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
49,840✔
1439
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
5,146✔
1440
    }
1441
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
49,840✔
1442
    info.uid = pTableMeta->uid;
48,305✔
1443
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
48,305✔
1444
      info.suid = pTableMeta->suid;
37,012✔
1445
    } else {
1446
      info.suid = pTableMeta->uid;
11,293✔
1447
    }
1448
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
48,305✔
1449
    RAW_RETURN_CHECK(code);
48,305✔
1450

1451
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid,
48,305✔
1452
           taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
1453
    if (pCreateReqDst) {
48,305✔
1454
      pTableMeta->vgId = info.vgInfo.vgId;
5,146✔
1455
      pTableMeta->uid = pCreateReqDst->uid;
5,146✔
1456
      pCreateReqDst->ctb.suid = pTableMeta->suid;
5,146✔
1457
    }
1458

1459
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
48,305✔
1460
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
48,305✔
1461
    RAW_RETURN_CHECK(
48,305✔
1462
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
1463
  }
1464

1465
  if (pTableMeta == NULL || retry > 0) {
52,352✔
1466
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
4,047✔
1467
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
4,047✔
1468
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
1,552✔
1469
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
1,552✔
1470
      RAW_RETURN_CHECK(code);
1,552✔
1471
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d",
1,552✔
1472
             tmpInfo->suid, taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
1473
    } else {
1474
      pTableMeta = *pTableMetaTmp;
2,495✔
1475
      pTableMeta->uid = tmpInfo->uid;
2,495✔
1476
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
2,495✔
1477
    }
1478
  }
1479
  *pMeta = pTableMeta;
52,352✔
1480
  pTableMeta = NULL;
52,352✔
1481

1482
end:
53,887✔
1483
  taosMemoryFree(pTableMeta);
53,887✔
1484
  RAW_LOG_END
53,887✔
1485
  return code;
53,887✔
1486
}
1487

1488
static int32_t tmqWriteRawCommon(TAOS* taos, void* data, uint32_t dataLen,
23,850✔
1489
                                 _raw_decode_func_ decodeFunc, bool withMeta) {
1490
  if (taos == NULL || data == NULL) {
23,850✔
1491
    uError("invalid parameter in %s, taos:%p, data:%p, withMeta:%d", __func__, taos, data, withMeta);
×
1492
    return TSDB_CODE_INVALID_PARA;
×
1493
  }
1494
  int32_t   code = TSDB_CODE_SUCCESS;
23,850✔
1495
  int32_t   lino = 0;
23,850✔
1496
  SQuery*   pQuery = NULL;
23,850✔
1497
  SMqRspObj rspObj = {0};
23,850✔
1498
  SDecoder  decoder = {0};
23,850✔
1499
  SHashObj* pCreateTbHash = NULL;
23,850✔
1500

1501
  SRequestObj*     pRequest = NULL;
23,850✔
1502
  SCatalog*        pCatalog = NULL;
23,850✔
1503
  SRequestConnInfo conn = {0};
23,850✔
1504
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
23,850✔
1505
  uDebug(LOG_ID_TAG " write raw %s, data:%p, dataLen:%d", LOG_ID_VALUE, withMeta ? "metadata" : "data", data, dataLen);
23,850✔
1506
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, decodeFunc, &rspObj));
23,850✔
1507

1508
  if (withMeta) {
23,850✔
1509
    pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
3,198✔
1510
    RAW_NULL_CHECK(pCreateTbHash);
3,198✔
1511
    RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
3,198✔
1512
  }
1513

1514
  SHashObj* pVgHash = NULL;
23,850✔
1515
  SHashObj* pNameHash = NULL;
23,850✔
1516
  SHashObj* pMetaHash = NULL;
23,850✔
1517
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
23,850✔
1518
  int retry = 0;
23,850✔
1519
  while (1) {
1520
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
23,850✔
1521
    uDebug(LOG_ID_TAG " write raw %s block num:%d, retry:%d", LOG_ID_VALUE,
23,850✔
1522
           withMeta ? "metadata" : "data", rspObj.dataRsp.blockNum, retry);
1523
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
77,737✔
1524
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
53,887✔
1525
      RAW_NULL_CHECK(tbName);
53,887✔
1526
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
53,887✔
1527
      RAW_NULL_CHECK(pSW);
53,887✔
1528
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
53,887✔
1529
      RAW_NULL_CHECK(pRetrieve);
53,887✔
1530
      void* rawData = getRawDataFromRes(pRetrieve);
53,887✔
1531
      RAW_NULL_CHECK(rawData);
53,887✔
1532

1533
      uTrace(LOG_ID_TAG " write raw %s block[%d] tbname:%s, schemaCols:%d", LOG_ID_VALUE,
53,887✔
1534
             withMeta ? "metadata" : "data", rspObj.resIter, tbName, pSW->nCols);
1535
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
53,887✔
1536
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
53,887✔
1537
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
53,887✔
1538

1539
      SVCreateTbReq* pCreateReqDst = NULL;
53,887✔
1540
      if (pCreateTbHash) {
53,887✔
1541
        pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
8,594✔
1542
      }
1543
      STableMeta* pTableMeta = NULL;
53,887✔
1544
      code = processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
53,887✔
1545
                              &pTableMeta, pSW, rawData, retry);
1546
      PROCESS_TABLE_NOT_EXIST(code, tbName)
53,887✔
1547
      char err[ERR_MSG_LEN] = {0};
52,352✔
1548
      code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
52,352✔
1549
      if (code != TSDB_CODE_SUCCESS) {
52,352✔
UNCOV
1550
        uError(LOG_ID_TAG " rawBlockBindData failed, table:%s, err:%s, code:%s", LOG_ID_VALUE, pName.tname, err, tstrerror(code));
×
UNCOV
1551
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
1552
        goto end;
×
1553
      }
1554
    }
1555
    if (taosHashGetSize(pVgHash) == 0) {
23,850✔
1556
      goto end;
1,228✔
1557
    }
1558
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
22,622✔
1559
    launchQueryImpl(pRequest, pQuery, true, NULL);
22,622✔
1560
    code = pRequest->code;
22,622✔
1561

1562
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
22,622✔
UNCOV
1563
      uInfo(LOG_ID_TAG " write raw retry:%d/3, code:%d, msg:%s", LOG_ID_VALUE, retry, code, tstrerror(code));
×
UNCOV
1564
      qDestroyQuery(pQuery);
×
UNCOV
1565
      pQuery = NULL;
×
UNCOV
1566
      rspObj.resIter = -1;
×
UNCOV
1567
      continue;
×
1568
    }
1569
    break;
22,622✔
1570
  }
1571
  uDebug(LOG_ID_TAG " write raw %s return, msg:%s", LOG_ID_VALUE, withMeta ? "metadata" : "data", tstrerror(code));
22,622✔
1572

1573
end:
23,850✔
1574
  if (withMeta) {
23,850✔
1575
    tDeleteSTaosxRsp(&rspObj.dataRsp);
3,198✔
1576
    void* pIter = taosHashIterate(pCreateTbHash, NULL);
3,198✔
1577
    while (pIter) {
8,344✔
1578
      tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
5,146✔
1579
      pIter = taosHashIterate(pCreateTbHash, pIter);
5,146✔
1580
    }
1581
    taosHashCleanup(pCreateTbHash);
3,198✔
1582
  } else {
1583
    tDeleteMqDataRsp(&rspObj.dataRsp);
20,652✔
1584
  }
1585
  tDecoderClear(&decoder);
23,850✔
1586
  qDestroyQuery(pQuery);
23,850✔
1587
  destroyRequest(pRequest);
23,850✔
1588
  RAW_LOG_END
23,850✔
1589
  return code;
23,850✔
1590
}
1591

1592
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
20,652✔
1593
  return tmqWriteRawCommon(taos, data, dataLen, tDecodeMqDataRsp, false);
20,652✔
1594
}
1595

1596
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
3,198✔
1597
  return tmqWriteRawCommon(taos, data, dataLen, tDecodeSTaosxRsp, true);
3,198✔
1598
}
1599

UNCOV
1600
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
UNCOV
1601
  if (taos == NULL || data == NULL) {
×
UNCOV
1602
    uError("invalid parameter in %s", __func__);
×
UNCOV
1603
    return TSDB_CODE_INVALID_PARA;
×
1604
  }
UNCOV
1605
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
1606
  int32_t   lino = 0;
×
UNCOV
1607
  SQuery*   pQuery = NULL;
×
UNCOV
1608
  SHashObj* pVgroupHash = NULL;
×
UNCOV
1609
  SMqRspObj rspObj = {0};
×
UNCOV
1610
  SDecoder  decoder = {0};
×
1611

UNCOV
1612
  SRequestObj*     pRequest = NULL;
×
UNCOV
1613
  SCatalog*        pCatalog = NULL;
×
UNCOV
1614
  SRequestConnInfo conn = {0};
×
1615

UNCOV
1616
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
UNCOV
1617
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
UNCOV
1618
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
1619

UNCOV
1620
  SHashObj* pVgHash = NULL;
×
UNCOV
1621
  SHashObj* pNameHash = NULL;
×
UNCOV
1622
  SHashObj* pMetaHash = NULL;
×
UNCOV
1623
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
UNCOV
1624
  int retry = 0;
×
UNCOV
1625
  while (1) {
×
UNCOV
1626
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
UNCOV
1627
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
UNCOV
1628
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
UNCOV
1629
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
UNCOV
1630
    RAW_NULL_CHECK(pVgroupHash);
×
UNCOV
1631
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
UNCOV
1632
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
1633

UNCOV
1634
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
UNCOV
1635
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
UNCOV
1636
      RAW_NULL_CHECK(tbName);
×
UNCOV
1637
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
UNCOV
1638
      RAW_NULL_CHECK(pRetrieve);
×
UNCOV
1639
      void* rawData = getRawDataFromRes(pRetrieve);
×
UNCOV
1640
      RAW_NULL_CHECK(rawData);
×
1641

UNCOV
1642
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
UNCOV
1643
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
UNCOV
1644
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
UNCOV
1645
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
1646

1647
      // find schema data info
UNCOV
1648
      STableMeta* pTableMeta = NULL;
×
UNCOV
1649
      code = processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
1650
                                        NULL, retry);
UNCOV
1651
      PROCESS_TABLE_NOT_EXIST(code, tbName)
×
UNCOV
1652
      char err[ERR_MSG_LEN] = {0};
×
UNCOV
1653
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
UNCOV
1654
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1655
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
1656
        goto end;
×
1657
      }
1658
    }
UNCOV
1659
    taosHashCleanup(pVgroupHash);
×
UNCOV
1660
    pVgroupHash = NULL;
×
1661

UNCOV
1662
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
UNCOV
1663
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
UNCOV
1664
    code = pRequest->code;
×
1665

UNCOV
1666
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
UNCOV
1667
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
UNCOV
1668
      qDestroyQuery(pQuery);
×
UNCOV
1669
      pQuery = NULL;
×
UNCOV
1670
      rspObj.resIter = -1;
×
UNCOV
1671
      continue;
×
1672
    }
UNCOV
1673
    break;
×
1674
  }
UNCOV
1675
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1676

UNCOV
1677
end:
×
UNCOV
1678
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
UNCOV
1679
  tDecoderClear(&decoder);
×
UNCOV
1680
  qDestroyQuery(pQuery);
×
UNCOV
1681
  taosHashCleanup(pVgroupHash);
×
UNCOV
1682
  destroyRequest(pRequest);
×
UNCOV
1683
  RAW_LOG_END
×
1684
  return code;
×
1685
}
1686

1687
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
23,898✔
1688
  if (pRsp == NULL) {
23,898✔
UNCOV
1689
    uError("invalid parameter in %s", __func__);
×
UNCOV
1690
    return TSDB_CODE_INVALID_PARA;
×
1691
  }
1692
  int32_t pos = 0;
23,898✔
1693
  int32_t code = 0;
23,898✔
1694
  int32_t lino = 0;
23,898✔
1695
  RAW_LOG_START
23,898✔
1696
  SEncoder coder = {0};
23,898✔
1697
  tEncoderInit(&coder, NULL, 0);
23,898✔
1698
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset));
23,898✔
1699
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset));
23,898✔
1700
  pos = coder.pos;
23,898✔
1701
  tEncoderClear(&coder);
23,898✔
1702

1703
end:
23,898✔
1704
  if (code != 0) {
23,898✔
UNCOV
1705
    uError("getOffSetLen failed, code:%d", code);
×
UNCOV
1706
    return code;
×
1707
  } else {
1708
    uDebug("getOffSetLen success, len:%d", pos);
23,898✔
1709
    return pos;
23,898✔
1710
  }
1711
}
1712

1713
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
1714
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
23,898✔
1715
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
23,898✔
UNCOV
1716
    uError("invalid parameter in %s", __func__);
×
UNCOV
1717
    return TSDB_CODE_INVALID_PARA;
×
1718
  }
1719
  uint32_t len = 0;
23,898✔
1720
  int32_t  code = 0;
23,898✔
1721
  int32_t  lino = 0;
23,898✔
1722
  SEncoder encoder = {0};
23,898✔
1723
  void*    buf = NULL;
23,898✔
1724
  tEncodeSize(encodeFunc, rspObj, len, code);
23,898✔
1725
  RAW_FALSE_CHECK(code >= 0);
23,898✔
1726
  len += sizeof(int8_t) + sizeof(int32_t);
23,898✔
1727
  buf = taosMemoryCalloc(1, len);
23,898✔
1728
  RAW_NULL_CHECK(buf);
23,898✔
1729
  tEncoderInit(&encoder, buf, len);
23,898✔
1730
  RAW_RETURN_CHECK(tEncodeI8(&encoder, MQ_DATA_RSP_VERSION));
23,898✔
1731
  int32_t offsetLen = getOffSetLen(rspObj);
23,898✔
1732
  RAW_FALSE_CHECK(offsetLen > 0);
23,898✔
1733
  RAW_RETURN_CHECK(tEncodeI32(&encoder, offsetLen));
23,898✔
1734
  RAW_RETURN_CHECK(encodeFunc(&encoder, rspObj));
23,898✔
1735

1736
  raw->raw = buf;
23,898✔
1737
  buf = NULL;
23,898✔
1738
  raw->raw_len = len;
23,898✔
1739

1740
end:
23,898✔
1741
  RAW_LOG_END
23,898✔
1742
  return code;
23,898✔
1743
}
1744

1745
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
107,048✔
1746
  if (raw == NULL || res == NULL) {
107,048✔
UNCOV
1747
    uError("invalid parameter in %s", __func__);
×
UNCOV
1748
    return TSDB_CODE_INVALID_PARA;
×
1749
  }
1750
  int32_t code = TSDB_CODE_SUCCESS;
107,048✔
1751
  int32_t lino = 0;
107,048✔
1752
  RAW_LOG_START
107,048✔
1753
  *raw = (tmq_raw_data){0};
107,048✔
1754
  SMqRspObj* rspObj = ((SMqRspObj*)res);
107,048✔
1755
  uDebug("tmq_get_raw resType:%d", rspObj->resType);
107,048✔
1756
  if (TD_RES_TMQ_META(res)) {
107,048✔
1757
    raw->raw = rspObj->metaRsp.metaRsp;
77,671✔
1758
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
77,671✔
1759
    raw->raw_type = rspObj->metaRsp.resMsgType;
77,671✔
1760
    uDebug("tmq get raw type meta:%p", raw);
77,671✔
1761
  } else if (TD_RES_TMQ(res)) {
29,377✔
1762
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw));
20,700✔
1763
    raw->raw_type = RES_TYPE__TMQ;
20,700✔
1764
    uDebug("tmq get raw type data:%p", raw);
20,700✔
1765
  } else if (TD_RES_TMQ_METADATA(res)) {
8,677✔
1766
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw));
3,198✔
1767
    raw->raw_type = RES_TYPE__TMQ_METADATA;
3,198✔
1768
    uDebug("tmq get raw type metadata:%p", raw);
3,198✔
1769
  } else if (TD_RES_TMQ_BATCH_META(res)) {
5,479✔
1770
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
5,479✔
1771
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
5,479✔
1772
    raw->raw_type = rspObj->resType;
5,479✔
1773
    uDebug("tmq get raw batch meta:%p", raw);
5,479✔
UNCOV
1774
  } else if (TD_RES_TMQ_RAW(res)) {
×
UNCOV
1775
    raw->raw = rspObj->dataRsp.rawData;
×
UNCOV
1776
    rspObj->dataRsp.rawData = NULL;
×
UNCOV
1777
    raw->raw_len = rspObj->dataRsp.len;
×
UNCOV
1778
    raw->raw_type = rspObj->resType;
×
UNCOV
1779
    uDebug("tmq get raw raw:%p", raw);
×
1780
  } else {
1781
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
1782
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
1783
  }
1784

1785
end:
107,048✔
1786
  RAW_LOG_END
107,048✔
1787
  return code;
107,048✔
1788
}
1789

1790
void tmq_free_raw(tmq_raw_data raw) {
107,000✔
1791
  uDebug("tmq free raw data type:%d", raw.raw_type);
107,000✔
1792
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
107,000✔
1793
    taosMemoryFree(raw.raw);
23,850✔
1794
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
83,150✔
UNCOV
1795
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
1796
  }
1797
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
107,000✔
1798
}
107,000✔
1799

1800
static int32_t writeRawInit() {
146,689✔
1801
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
154,072✔
1802
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
7,383✔
1803
    if (old == 0) {
7,383✔
1804
      int32_t code = initRawCacheHash();
7,383✔
1805
      if (code != 0) {
7,383✔
UNCOV
1806
        uError("tmq writeRawImpl init error:%d", code);
×
UNCOV
1807
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
UNCOV
1808
        return code;
×
1809
      }
1810
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
7,383✔
1811
    }
1812
  }
1813

1814
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
146,689✔
UNCOV
1815
    return TSDB_CODE_INTERNAL_ERROR;
×
1816
  }
1817
  return 0;
146,689✔
1818
}
1819

1820
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
146,689✔
1821
  if (taos == NULL || buf == NULL) {
146,689✔
UNCOV
1822
    uError("invalid parameter in %s", __func__);
×
UNCOV
1823
    return TSDB_CODE_INVALID_PARA;
×
1824
  }
1825
  if (writeRawInit() != 0) {
146,689✔
UNCOV
1826
    return TSDB_CODE_INTERNAL_ERROR;
×
1827
  }
1828

1829
  uDebug("writeRawImpl type:%d, buf:%p, len:%d", type, buf, len);
146,689✔
1830
  switch (type) {
146,689✔
1831
    case TDMT_VND_CREATE_STB:
40,816✔
1832
    case TDMT_VND_ALTER_STB:
1833
      return taosCreateStb(taos, buf, len);
40,816✔
1834
    case TDMT_VND_DROP_STB:
2,482✔
1835
      return taosDropStb(taos, buf, len);
2,482✔
1836
    case TDMT_VND_CREATE_TABLE:
55,310✔
1837
      return taosCreateTable(taos, buf, len);
55,310✔
1838
    case TDMT_VND_ALTER_TABLE:
15,611✔
1839
      return taosAlterTable(taos, buf, len);
15,611✔
1840
    case TDMT_VND_DROP_TABLE:
2,207✔
1841
      return taosDropTable(taos, buf, len);
2,207✔
1842
    case TDMT_VND_DELETE:
934✔
1843
      return taosDeleteData(taos, buf, len);
934✔
1844
    case RES_TYPE__TMQ_METADATA:
3,198✔
1845
      return tmqWriteRawMetaDataImpl(taos, buf, len);
3,198✔
1846
    case RES_TYPE__TMQ_RAWDATA:
×
1847
      return tmqWriteRawRawDataImpl(taos, buf, len);
×
1848
    case RES_TYPE__TMQ:
20,652✔
1849
      return tmqWriteRawDataImpl(taos, buf, len);
20,652✔
1850
    case RES_TYPE__TMQ_BATCH_META:
5,479✔
1851
      return tmqWriteBatchMetaDataImpl(taos, buf, len);
5,479✔
UNCOV
1852
    default:
×
UNCOV
1853
      uError("writeRawImpl unknown type:%d", type);
×
UNCOV
1854
      return TSDB_CODE_INVALID_PARA;
×
1855
  }
1856
}
1857

1858
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
102,360✔
1859
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
102,360✔
1860
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
3,087✔
1861
    return TSDB_CODE_INVALID_PARA;
3,087✔
1862
  }
1863
  taosClearErrMsg();  // clear global error message
99,273✔
1864
  uDebug("tmq_write_raw connId:0x%" PRIx64 ", raw_type:%d, raw_len:%d", *(int64_t*)taos, raw.raw_type, raw.raw_len);
99,273✔
1865

1866
  int32_t code = writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
99,273✔
1867
  if (code != TSDB_CODE_SUCCESS) {
99,273✔
UNCOV
1868
    uError("tmq_write_raw connId:0x%" PRIx64 " failed, raw_type:%d, code:%s", *(int64_t*)taos, raw.raw_type, tstrerror(code));
×
1869
  }
1870
  return code;
99,273✔
1871
}
1872

1873
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
5,479✔
1874
  if (taos == NULL || meta == NULL) {
5,479✔
UNCOV
1875
    uError("invalid parameter in %s", __func__);
×
UNCOV
1876
    return TSDB_CODE_INVALID_PARA;
×
1877
  }
1878
  SMqBatchMetaRsp rsp = {0};
5,479✔
1879
  SDecoder        coder = {0};
5,479✔
1880
  SDecoder        metaCoder = {0};
5,479✔
1881
  int32_t         code = TSDB_CODE_SUCCESS;
5,479✔
1882
  int32_t         lino = 0;
5,479✔
1883

1884
  RAW_LOG_START
5,479✔
1885
  // decode and process req
1886
  tDecoderInit(&coder, meta, metaLen);
5,479✔
1887
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
5,479✔
1888
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
5,479✔
1889
  uDebug("%s batch meta count:%d, metaLen:%d", __func__, num, metaLen);
5,479✔
1890
  for (int32_t i = 0; i < num; i++) {
52,895✔
1891
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
47,416✔
1892
    RAW_NULL_CHECK(len);
47,416✔
1893
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
47,416✔
1894
    RAW_NULL_CHECK(tmpBuf);
47,416✔
1895
    SMqMetaRsp metaRsp = {0};
47,416✔
1896
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
47,416✔
1897
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
47,416✔
1898
    uDebug("%s processing batch item %d/%d, resMsgType:%d, metaRspLen:%d", __func__, i, num,
47,416✔
1899
           metaRsp.resMsgType, metaRsp.metaRspLen);
1900
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
47,416✔
1901
    tDeleteMqMetaRsp(&metaRsp);
47,416✔
1902
    tDecoderClear(&metaCoder);
47,416✔
1903
    if (code != TSDB_CODE_SUCCESS) {
47,416✔
UNCOV
1904
      uError("%s batch item %d/%d failed, resMsgType:%d, code:%s", __func__, i, num,
×
1905
             metaRsp.resMsgType, tstrerror(code));
UNCOV
1906
      goto end;
×
1907
    }
1908
  }
1909

1910
end:
5,479✔
1911
  tDecoderClear(&coder);
5,479✔
1912
  tDecoderClear(&metaCoder);
5,479✔
1913
  tDeleteMqBatchMetaRsp(&rsp);
5,479✔
1914
  RAW_LOG_END
5,479✔
1915
  return code;
5,479✔
1916
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc