• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.22
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <string.h>
17
#include "cJSON.h"
18
#include "clientInt.h"
19
#include "clientRawBlockWrite.h"
20
#include "nodes.h"
21
#include "osMemPool.h"
22
#include "osMemory.h"
23
#include "parser.h"
24
#include "taosdef.h"
25
#include "tarray.h"
26
#include "tbase64.h"
27
#include "tcol.h"
28
#include "tcompression.h"
29
#include "tdatablock.h"
30
#include "tdataformat.h"
31
#include "tdef.h"
32
#include "tglobal.h"
33
#include "tmsgtype.h"
34

35
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
36
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
41,582✔
37
  if (db == NULL) {
41,582✔
38
    return suid;
×
39
  }
40
  return suid + MurmurHash3_32(db, strlen(db));
41,582✔
41
}
42

43
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
41,582✔
44
  if (taos == NULL || meta == NULL) {
41,582✔
45
    uError("invalid parameter in %s", __func__);
×
46
    return TSDB_CODE_INVALID_PARA;
×
47
  }
48
  SVCreateStbReq req = {0};
41,582✔
49
  SDecoder       coder = {0};
41,582✔
50
  SMCreateStbReq pReq = {0};
41,582✔
51
  int32_t        code = TSDB_CODE_SUCCESS;
41,582✔
52
  int32_t        lino = 0;
41,582✔
53
  SRequestObj*   pRequest = NULL;
41,582✔
54
  SCmdMsgInfo    pCmdMsg = {0};
41,582✔
55
  RAW_LOG_START
41,582✔
56

57
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
41,582✔
58
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d, db:%s", LOG_ID_VALUE, meta, metaLen,
41,582✔
59
         pRequest->pDb ? pRequest->pDb : "NULL");
60
  pRequest->syncQuery = true;
41,582✔
61
  if (!pRequest->pDb) {
41,582✔
62
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
63
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
64
    goto end;
×
65
  }
66
  // decode and process req
67
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
41,582✔
68
  uint32_t len = metaLen - sizeof(SMsgHead);
41,582✔
69
  tDecoderInit(&coder, data, len);
41,582✔
70
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
41,582✔
71

72
  int8_t           createDefaultCompress = 0;
41,582✔
73
  SColCmprWrapper* p = &req.colCmpr;
41,582✔
74
  if (p->nCols == 0) {
41,582✔
75
    createDefaultCompress = 1;
×
76
  }
77
  // build create stable
78
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
41,582✔
79
  RAW_NULL_CHECK(pReq.pColumns);
41,582✔
80
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
261,034✔
81
    SSchema*          pSchema = req.schemaRow.pSchema + i;
219,452✔
82
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
219,452✔
83
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
219,452✔
84

85
    if (createDefaultCompress) {
219,452✔
86
      field.compress = createDefaultColCmprByType(pSchema->type);
×
87
    } else {
88
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
219,452✔
89
      field.compress = pCmp->alg;
219,452✔
90
    }
91
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
219,452✔
92
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
438,904✔
93
  }
94
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
41,582✔
95
  RAW_NULL_CHECK(pReq.pTags);
41,582✔
96
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
151,069✔
97
    SSchema* pSchema = req.schemaTag.pSchema + i;
109,487✔
98
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
109,487✔
99
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
109,487✔
100
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
218,974✔
101
  }
102

103
  pReq.colVer = req.schemaRow.version;
41,582✔
104
  pReq.tagVer = req.schemaTag.version;
41,582✔
105
  pReq.numOfColumns = req.schemaRow.nCols;
41,582✔
106
  pReq.numOfTags = req.schemaTag.nCols;
41,582✔
107
  pReq.commentLen = -1;
41,582✔
108
  pReq.suid = processSuid(req.suid, pRequest->pDb);
41,582✔
109
  pReq.source = TD_REQ_FROM_TAOX;
41,582✔
110
  pReq.igExists = true;
41,582✔
111
  pReq.virtualStb = req.virtualStb;
41,582✔
112
  pReq.securityLevel = req.securityLevel;  // Preserve source cluster's security classification
41,582✔
113

114
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
41,582✔
115
         pReq.suid);
116
  STscObj* pTscObj = pRequest->pTscObj;
41,582✔
117
  SName    tableName = {0};
41,582✔
118
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
41,582✔
119
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
41,582✔
120
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
41,582✔
121
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
41,582✔
122
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
41,582✔
123
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
41,582✔
124
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
41,582✔
125
  RAW_NULL_CHECK(pCmdMsg.pMsg);
41,582✔
126
  RAW_FALSE_CHECK(tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
41,582✔
127

128
  SQuery pQuery = {0};
41,582✔
129
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
41,582✔
130
  pQuery.pCmdMsg = &pCmdMsg;
41,582✔
131
  pQuery.msgType = pQuery.pCmdMsg->msgType;
41,582✔
132
  pQuery.stableQuery = true;
41,582✔
133

134
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
41,582✔
135

136
  if (pRequest->code == TSDB_CODE_SUCCESS) {
41,582✔
137
    SCatalog* pCatalog = NULL;
41,582✔
138
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
41,582✔
139
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
41,582✔
140
  }
141

142
  code = pRequest->code;
41,582✔
143
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
41,582✔
144

145
end:
41,582✔
146
  destroyRequest(pRequest);
41,582✔
147
  tFreeSMCreateStbReq(&pReq);
41,582✔
148
  tDecoderClear(&coder);
41,582✔
149
  taosMemoryFree(pCmdMsg.pMsg);
41,582✔
150
  RAW_LOG_END
41,582✔
151
  return code;
41,582✔
152
}
153

154
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
2,536✔
155
  if (taos == NULL || meta == NULL) {
2,536✔
UNCOV
156
    uError("invalid parameter in %s", __func__);
×
157
    return TSDB_CODE_INVALID_PARA;
×
158
  }
159
  SVDropStbReq req = {0};
2,536✔
160
  SDecoder     coder = {0};
2,536✔
161
  SMDropStbReq pReq = {0};
2,536✔
162
  int32_t      code = TSDB_CODE_SUCCESS;
2,536✔
163
  int32_t      lino = 0;
2,536✔
164
  SRequestObj* pRequest = NULL;
2,536✔
165
  SCmdMsgInfo  pCmdMsg = {0};
2,536✔
166

167
  RAW_LOG_START
2,536✔
168
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
2,536✔
169
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
2,536✔
170
  pRequest->syncQuery = true;
2,536✔
171
  if (!pRequest->pDb) {
2,536✔
UNCOV
172
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
173
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
174
    goto end;
×
175
  }
176
  // decode and process req
177
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
2,536✔
178
  uint32_t len = metaLen - sizeof(SMsgHead);
2,536✔
179
  tDecoderInit(&coder, data, len);
2,536✔
180
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&coder, &req));
2,536✔
181
  SCatalog* pCatalog = NULL;
2,536✔
182
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,536✔
183
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
2,536✔
184
                           .requestId = pRequest->requestId,
2,536✔
185
                           .requestObjRefId = pRequest->self,
2,536✔
186
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
2,536✔
187
  SName            pName = {0};
2,536✔
188
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
2,536✔
189
  STableMeta* pTableMeta = NULL;
2,536✔
190
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
2,536✔
191
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
2,536✔
192
    uInfo(LOG_ID_TAG " stable %s not exist, ignore drop", LOG_ID_VALUE, req.name);
628✔
193
    code = TSDB_CODE_SUCCESS;
628✔
194
    taosMemoryFreeClear(pTableMeta);
628✔
195
    goto end;
628✔
196
  }
197
  RAW_RETURN_CHECK(code);
1,908✔
198
  pReq.suid = pTableMeta->uid;
1,908✔
199
  taosMemoryFreeClear(pTableMeta);
1,908✔
200

201
  // build drop stable
202
  pReq.igNotExists = true;
1,908✔
203
  pReq.source = TD_REQ_FROM_TAOX;
1,908✔
204
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
205

206
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
1,908✔
207
         pReq.suid);
208
  STscObj* pTscObj = pRequest->pTscObj;
1,908✔
209
  SName    tableName = {0};
1,908✔
210
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
1,908✔
211
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
1,908✔
212

213
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,908✔
214
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
1,908✔
215
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
1,908✔
216
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
1,908✔
217
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
1,908✔
218
  RAW_NULL_CHECK(pCmdMsg.pMsg);
1,908✔
219
  RAW_FALSE_CHECK(tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
1,908✔
220

221
  SQuery pQuery = {0};
1,908✔
222
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
1,908✔
223
  pQuery.pCmdMsg = &pCmdMsg;
1,908✔
224
  pQuery.msgType = pQuery.pCmdMsg->msgType;
1,908✔
225
  pQuery.stableQuery = true;
1,908✔
226

227
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
1,908✔
228
  if (pRequest->code == TSDB_CODE_SUCCESS) {
1,908✔
229
    // ignore the error code
230
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
1,908✔
231
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
1,908✔
232
  }
233

234
  code = pRequest->code;
1,908✔
235
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,908✔
236

237
end:
2,536✔
238
  RAW_LOG_END
2,536✔
239
  destroyRequest(pRequest);
2,536✔
240
  tDecoderClear(&coder);
2,536✔
241
  return code;
2,536✔
242
}
243

244
typedef struct SVgroupCreateTableBatch {
245
  SVCreateTbBatchReq req;
246
  SVgroupInfo        info;
247
  char               dbName[TSDB_DB_NAME_LEN];
248
} SVgroupCreateTableBatch;
249

250
static void destroyCreateTbReqBatch(void* data) {
56,967✔
251
  if (data == NULL) {
56,967✔
UNCOV
252
    uError("invalid parameter in %s", __func__);
×
253
    return;
×
254
  }
255
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
56,967✔
256
  taosArrayDestroy(pTbBatch->req.pArray);
56,967✔
257
}
258

259
static const SSchema* getNormalColSchema(const STableMeta* pTableMeta, const char* pColName) {
6,898✔
260
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
19,932✔
261
    const SSchema* pSchema = pTableMeta->schema + i;
19,932✔
262
    if (0 == strcmp(pColName, pSchema->name)) {
19,932✔
263
      return pSchema;
6,898✔
264
    }
265
  }
UNCOV
266
  return NULL;
×
267
}
268

269
static STableMeta* getTableMeta(SCatalog* pCatalog, SRequestConnInfo* conn, char* dbName, char* tbName, int32_t acctId){
6,898✔
270
  SName       sName = {0};
6,898✔
271
  toName(acctId, dbName, tbName, &sName);
6,898✔
272
  STableMeta* pTableMeta = NULL;
6,898✔
273
  int32_t code = catalogGetTableMeta(pCatalog, conn, &sName, &pTableMeta);
6,898✔
274
  if (code != 0) {
6,898✔
UNCOV
275
    uError("failed to get table meta for reference table:%s.%s", dbName, tbName);
×
276
    taosMemoryFreeClear(pTableMeta);
×
277
    terrno = code;
×
278
    return NULL;
×
279
  }
280
  return pTableMeta;
6,898✔
281
}
282

283
static int32_t checkColRef(STableMeta* pTableMeta, char* colName, uint8_t precision, const char* pSchemaName,
6,513✔
284
                           const SDataType* pType) {
285
  int32_t code = TSDB_CODE_SUCCESS;
6,513✔
286
  if (pTableMeta->tableInfo.precision != precision) {
6,513✔
UNCOV
287
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
288
    uError("timestamp precision of virtual table and its reference table do not match");
×
289
    goto end;
×
290
  }
291
  // org table cannot has composite primary key
292
  if (pTableMeta->tableInfo.numOfColumns > 1 && pTableMeta->schema[1].flags & COL_IS_KEY) {
6,513✔
UNCOV
293
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
294
    uError("virtual table's column:\"%s\"'s reference can not from table with composite key", colName);
×
295
    goto end;
×
296
  }
297

298
  // org table must be child table or normal table
299
  if (pTableMeta->tableType != TSDB_NORMAL_TABLE && pTableMeta->tableType != TSDB_CHILD_TABLE) {
6,513✔
UNCOV
300
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
301
    uError("virtual table's column:\"%s\"'s reference can only be normal table or child table", colName);
×
302
    goto end;
×
303
  }
304

305
  const SSchema* pRefCol = getNormalColSchema(pTableMeta, colName);
6,513✔
306
  if (NULL == pRefCol) {
6,513✔
UNCOV
307
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
308
    uError("virtual table's column:\"%s\"'s reference column:\"%s\" not exist", pSchemaName, colName);
×
309
    goto end;
×
310
  }
311

312
  int32_t refColIndex = getNormalColSchemaIndex(pTableMeta, colName);
6,513✔
313
  const SSchemaExt* pRefSchemaExt =
6,513✔
314
      (refColIndex >= 0 && pTableMeta->schemaExt && refColIndex < pTableMeta->tableInfo.numOfColumns)
6,513✔
315
          ? pTableMeta->schemaExt + refColIndex
6,513✔
316
          : NULL;
13,026✔
317
  SDataType refType = {0};
6,513✔
318
  schemaToRefDataType(pRefCol, NULL != pRefSchemaExt ? pRefSchemaExt->typeMod : 0, &refType);
6,513✔
319

320
  if (!isSameRefDataType(pType, &refType)) {
6,513✔
UNCOV
321
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
322
    uError("virtual table's column:\"%s\"'s type and reference column:\"%s\"'s type not match, %d %d %d %d",
×
323
            pSchemaName, colName, pType->type, pType->bytes, refType.type, refType.bytes);
UNCOV
324
    goto end;
×
325
  }
326

327
end:
6,513✔
328
  return code;
6,513✔
329
}
330

331
static int32_t checkColRefForCreate(SCatalog* pCatalog, SRequestConnInfo* conn, SColRef* pColRef, int32_t acctId,
6,128✔
332
                                    uint8_t precision, SSchema* pSchema, const SSchemaExt* pSchemaExt) {
333
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, pColRef->refDbName, pColRef->refTableName, acctId);
6,128✔
334
  if (pTableMeta == NULL) {
6,128✔
UNCOV
335
      return terrno;
×
336
  }
337
  SDataType colType = {0};
6,128✔
338
  schemaToRefDataType(pSchema, NULL != pSchemaExt ? pSchemaExt->typeMod : 0, &colType);
6,128✔
339
  int32_t code = checkColRef(pTableMeta, pColRef->refColName, precision, pSchema->name, &colType);
6,128✔
340
  taosMemoryFreeClear(pTableMeta);
6,128✔
341
  return code;
6,128✔
342
}
343

UNCOV
344
static int32_t checkColRefForAdd(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
×
345
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc, int8_t type, int32_t bytes, STypeMod typeMod) {
UNCOV
346
  int32_t code = 0;
×
347
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
×
348
  if (pTableMeta == NULL) {
×
349
    code = terrno;
×
350
    goto end;
×
351
  }
UNCOV
352
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
×
353
  if (pTableMetaSrc == NULL) {
×
354
    code = terrno;
×
355
    goto end;
×
356
  }
357

UNCOV
358
  SDataType colType = {.type = type, .bytes = bytes};
×
359
  fillTypeFromTypeMod(&colType, typeMod);
×
360
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, colNameSrc, &colType);
×
361

UNCOV
362
end:
×
363
  taosMemoryFreeClear(pTableMeta);
×
364
  taosMemoryFreeClear(pTableMetaSrc);
×
365
  return code;
×
366
}
367

368
static int32_t checkColRefForAlter(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
385✔
369
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc) {
370
  int32_t code = 0;
385✔
371
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
385✔
372
  if (pTableMeta == NULL) {
385✔
UNCOV
373
    code = terrno;
×
374
    goto end;
×
375
  }
376
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
385✔
377
  if (pTableMetaSrc == NULL) {
385✔
UNCOV
378
    code = terrno;
×
379
    goto end;
×
380
  }
381
  const SSchema* pSchema = getNormalColSchema(pTableMetaSrc, colNameSrc);
385✔
382
  if (NULL == pSchema) {
385✔
UNCOV
383
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
384
    uError("virtual table's column:\"%s\" not exist", colNameSrc);
×
385
    goto end;
×
386
  }
387

388
  int32_t schemaIdx = getNormalColSchemaIndex(pTableMetaSrc, colNameSrc);
385✔
389
  const SSchemaExt* pSchemaExt =
385✔
390
      (schemaIdx >= 0 && pTableMetaSrc->schemaExt && schemaIdx < pTableMetaSrc->tableInfo.numOfColumns)
385✔
391
          ? pTableMetaSrc->schemaExt + schemaIdx
385✔
392
          : NULL;
770✔
393
  SDataType colType = {0};
385✔
394
  schemaToRefDataType(pSchema, NULL != pSchemaExt ? pSchemaExt->typeMod : 0, &colType);
385✔
395
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, pSchema->name, &colType);
385✔
396

397
end:
385✔
398
  taosMemoryFreeClear(pTableMeta);
385✔
399
  taosMemoryFreeClear(pTableMetaSrc);
385✔
400
  return code;
385✔
401
}
402

403
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
56,338✔
404
  if (taos == NULL || meta == NULL) {
56,338✔
UNCOV
405
    uError("invalid parameter in %s", __func__);
×
406
    return TSDB_CODE_INVALID_PARA;
×
407
  }
408
  SVCreateTbBatchReq req = {0};
56,338✔
409
  SDecoder           coder = {0};
56,338✔
410
  int32_t            code = TSDB_CODE_SUCCESS;
56,338✔
411
  int32_t            lino = 0;
56,338✔
412
  SRequestObj*       pRequest = NULL;
56,338✔
413
  SQuery*            pQuery = NULL;
56,338✔
414
  SHashObj*          pVgroupHashmap = NULL;
56,338✔
415

416
  RAW_LOG_START
56,338✔
417
  SArray* pTagList = taosArrayInit(0, POINTER_BYTES);
56,338✔
418
  RAW_NULL_CHECK(pTagList);
56,338✔
419
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
56,338✔
420
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d, db:%s", LOG_ID_VALUE, meta, metaLen,
56,338✔
421
         pRequest->pDb ? pRequest->pDb : "NULL");
422

423
  pRequest->syncQuery = true;
56,338✔
424
  if (!pRequest->pDb) {
56,338✔
UNCOV
425
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
426
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
427
    goto end;
×
428
  }
429
  // decode and process req
430
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
56,338✔
431
  uint32_t len = metaLen - sizeof(SMsgHead);
56,338✔
432
  tDecoderInit(&coder, data, len);
56,338✔
433
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&coder, &req));
56,338✔
434
  STscObj* pTscObj = pRequest->pTscObj;
56,338✔
435

436
  SVCreateTbReq* pCreateReq = NULL;
56,338✔
437
  SCatalog*      pCatalog = NULL;
56,338✔
438
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
56,338✔
439
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
56,338✔
440
  RAW_NULL_CHECK(pVgroupHashmap);
56,338✔
441
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
56,338✔
442

443
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
56,338✔
444
                           .requestId = pRequest->requestId,
56,338✔
445
                           .requestObjRefId = pRequest->self,
56,338✔
446
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
56,338✔
447

448
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
56,338✔
449
  RAW_NULL_CHECK(pRequest->tableList);
56,338✔
450
  // loop to create table
451
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
116,230✔
452
    pCreateReq = req.pReqs + iReq;
59,892✔
453

454
    SVgroupInfo pInfo = {0};
59,892✔
455
    SName       pName = {0};
59,892✔
456
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
59,892✔
457
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
59,892✔
458

459
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
59,892✔
460
    // change tag cid to new cid
461
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
59,892✔
462
      STableMeta* pTableMeta = NULL;
50,365✔
463
      SName       sName = {0};
50,365✔
464
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
50,365✔
465
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
466
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
50,365✔
467
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
50,365✔
468
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
50,365✔
UNCOV
469
        uInfo(LOG_ID_TAG " super table %s not exist, ignore create child table %s", LOG_ID_VALUE,
×
470
              pCreateReq->ctb.stbName, pCreateReq->name);
UNCOV
471
        code = TSDB_CODE_SUCCESS;
×
472
        taosMemoryFreeClear(pTableMeta);
×
473
        continue;
×
474
      }
475

476
      RAW_RETURN_CHECK(code);
50,365✔
477
      pCreateReq->ctb.suid = pTableMeta->uid;
50,365✔
478

479
      bool changeDB = strlen(tmqWriteRefDB) > 0;
50,365✔
480
      for (int32_t i = 0; changeDB && i < pCreateReq->colRef.nCols; i++) {
65,701✔
481
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
15,336✔
482
        tstrncpy(pColRef->refDbName, tmqWriteRefDB, TSDB_DB_NAME_LEN);
15,336✔
483
      }
484

485
      for (int32_t i = 0; tmqWriteCheckRef && i < pCreateReq->colRef.nCols && i < pTableMeta->tableInfo.numOfColumns; i++) {
62,621✔
486
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
12,256✔
487
        if (!pColRef || !pColRef->hasRef) continue;
12,256✔
488
        SSchema* pSchema = pTableMeta->schema + i;
6,128✔
489
        const SSchemaExt* pSchemaExt =
6,128✔
490
            (pTableMeta->schemaExt && i < pTableMeta->tableInfo.numOfColumns) ? pTableMeta->schemaExt + i : NULL;
6,128✔
491
        RAW_RETURN_CHECK(
6,128✔
492
            checkColRefForCreate(pCatalog, &conn, pColRef, pTscObj->acctId, pTableMeta->tableInfo.precision, pSchema,
493
                                 pSchemaExt));
494
      }
495
      
496
      SArray* pTagVals = NULL;
50,365✔
497
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
50,365✔
498
      if (code != TSDB_CODE_SUCCESS) {
50,365✔
UNCOV
499
        uError("create tb invalid tag data %s", pCreateReq->name);
×
500
        taosMemoryFreeClear(pTableMeta);
×
501
        goto end;
×
502
      }
503

504
      bool rebuildTag = false;
50,365✔
505
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
149,628✔
506
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
99,263✔
507
        if (tName == NULL) {
99,263✔
UNCOV
508
          continue;
×
509
        }
510
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
99,263✔
511
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
376,486✔
512
          SSchema* tag = &pTableMeta->schema[j];
277,223✔
513
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
277,223✔
514
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
94,831✔
515
            if (pTagVal) {
94,831✔
516
              if (pTagVal->cid != tag->colId) {
94,831✔
517
                pTagVal->cid = tag->colId;
6,650✔
518
                rebuildTag = true;
6,650✔
519
              }
520
            } else {
UNCOV
521
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
522
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
523
            }
524
          }
525
        }
526
      }
527
      taosMemoryFreeClear(pTableMeta);
50,365✔
528
      if (rebuildTag) {
50,365✔
529
        STag* ppTag = NULL;
4,136✔
530
        code = tTagNew(pTagVals, 1, false, &ppTag);
4,136✔
531
        taosArrayDestroy(pTagVals);
4,136✔
532
        pTagVals = NULL;
4,136✔
533
        if (code != TSDB_CODE_SUCCESS) {
4,136✔
UNCOV
534
          goto end;
×
535
        }
536
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
4,136✔
UNCOV
537
          code = terrno;
×
538
          tTagFree(ppTag);
×
539
          goto end;
×
540
        }
541
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
4,136✔
542
      }
543
      taosArrayDestroy(pTagVals);
50,365✔
544
    }
545
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
119,784✔
546

547
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
59,892✔
548
    if (pTableBatch == NULL) {
59,892✔
549
      SVgroupCreateTableBatch tBatch = {0};
56,967✔
550
      tBatch.info = pInfo;
56,967✔
551
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
56,967✔
552

553
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
56,967✔
554
      RAW_NULL_CHECK(tBatch.req.pArray);
56,967✔
555
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
113,934✔
556
      tBatch.req.source = TD_REQ_FROM_TAOX;
56,967✔
557
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
56,967✔
558
    } else {  // add to the correct vgroup
559
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
5,850✔
560
    }
561
  }
562

563
  if (taosHashGetSize(pVgroupHashmap) == 0) {
56,338✔
UNCOV
564
    goto end;
×
565
  }
566
  SArray* pBufArray = NULL;
56,338✔
567
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
56,338✔
568
  pQuery = NULL;
56,338✔
569
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
56,338✔
570
  if (TSDB_CODE_SUCCESS != code) goto end;
56,338✔
571
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
56,338✔
572
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
56,338✔
573
  pQuery->stableQuery = false;
56,338✔
574
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
56,338✔
575
  if (TSDB_CODE_SUCCESS != code) goto end;
56,338✔
576
  RAW_NULL_CHECK(pQuery->pRoot);
56,338✔
577

578
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
56,338✔
579

580
  launchQueryImpl(pRequest, pQuery, true, NULL);
56,338✔
581
  if (pRequest->code == TSDB_CODE_SUCCESS) {
56,338✔
582
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
56,338✔
583
  }
584

585
  code = pRequest->code;
56,338✔
586
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
56,338✔
587

588
end:
56,338✔
589
  tDeleteSVCreateTbBatchReq(&req);
56,338✔
590

591
  taosHashCleanup(pVgroupHashmap);
56,338✔
592
  destroyRequest(pRequest);
56,338✔
593
  tDecoderClear(&coder);
56,338✔
594
  qDestroyQuery(pQuery);
56,338✔
595
  taosArrayDestroyP(pTagList, NULL);
56,338✔
596
  RAW_LOG_END
56,338✔
597
  return code;
56,338✔
598
}
599

600
typedef struct SVgroupDropTableBatch {
601
  SVDropTbBatchReq req;
602
  SVgroupInfo      info;
603
  char             dbName[TSDB_DB_NAME_LEN];
604
} SVgroupDropTableBatch;
605

606
static void destroyDropTbReqBatch(void* data) {
1,623✔
607
  if (data == NULL) {
1,623✔
UNCOV
608
    uError("invalid parameter in %s", __func__);
×
609
    return;
×
610
  }
611
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
1,623✔
612
  taosArrayDestroy(pTbBatch->req.pArray);
1,623✔
613
}
614

615
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
2,251✔
616
  if (taos == NULL || meta == NULL) {
2,251✔
UNCOV
617
    uError("invalid parameter in %s", __func__);
×
618
    return TSDB_CODE_INVALID_PARA;
×
619
  }
620
  SVDropTbBatchReq req = {0};
2,251✔
621
  SDecoder         coder = {0};
2,251✔
622
  int32_t          code = TSDB_CODE_SUCCESS;
2,251✔
623
  int32_t          lino = 0;
2,251✔
624
  SRequestObj*     pRequest = NULL;
2,251✔
625
  SQuery*          pQuery = NULL;
2,251✔
626
  SHashObj*        pVgroupHashmap = NULL;
2,251✔
627

628
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
2,251✔
629
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
2,251✔
630

631
  pRequest->syncQuery = true;
2,251✔
632
  if (!pRequest->pDb) {
2,251✔
UNCOV
633
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
634
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
635
    goto end;
×
636
  }
637
  // decode and process req
638
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
2,251✔
639
  uint32_t len = metaLen - sizeof(SMsgHead);
2,251✔
640
  tDecoderInit(&coder, data, len);
2,251✔
641
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&coder, &req));
2,251✔
642
  STscObj* pTscObj = pRequest->pTscObj;
2,251✔
643

644
  SVDropTbReq* pDropReq = NULL;
2,251✔
645
  SCatalog*    pCatalog = NULL;
2,251✔
646
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
2,251✔
647

648
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
2,251✔
649
  RAW_NULL_CHECK(pVgroupHashmap);
2,251✔
650
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
2,251✔
651

652
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2,251✔
653
                           .requestId = pRequest->requestId,
2,251✔
654
                           .requestObjRefId = pRequest->self,
2,251✔
655
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
2,251✔
656
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
2,251✔
657
  RAW_NULL_CHECK(pRequest->tableList);
2,251✔
658
  // loop to create table
659
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
5,156✔
660
    pDropReq = req.pReqs + iReq;
2,905✔
661
    pDropReq->igNotExists = true;
2,905✔
662
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
663

664
    SVgroupInfo pInfo = {0};
2,905✔
665
    SName       pName = {0};
2,905✔
666
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
2,905✔
667
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
2,905✔
668
    PROCESS_TABLE_NOT_EXIST(code, pDropReq->name)
2,905✔
669
    STableMeta* pTableMeta = NULL;
2,905✔
670
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
2,905✔
671
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
2,905✔
672
      code = TSDB_CODE_SUCCESS;
628✔
673
      uInfo(LOG_ID_TAG " table %s not exist, ignore drop", LOG_ID_VALUE, pDropReq->name);
628✔
674
      taosMemoryFreeClear(pTableMeta);
628✔
675
      continue;
628✔
676
    }
677
    RAW_RETURN_CHECK(code);
2,277✔
678
    tb_uid_t oldSuid = pDropReq->suid;
2,277✔
679
    pDropReq->suid = pTableMeta->suid;
2,277✔
680
    taosMemoryFreeClear(pTableMeta);
2,277✔
681
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
2,277✔
682
           pDropReq->suid);
683

684
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
4,554✔
685
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
2,277✔
686
    if (pTableBatch == NULL) {
2,277✔
687
      SVgroupDropTableBatch tBatch = {0};
1,623✔
688
      tBatch.info = pInfo;
1,623✔
689
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
1,623✔
690
      RAW_NULL_CHECK(tBatch.req.pArray);
1,623✔
691
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
3,246✔
692
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
1,623✔
693
    } else {  // add to the correct vgroup
694
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
1,308✔
695
    }
696
  }
697

698
  if (taosHashGetSize(pVgroupHashmap) == 0) {
2,251✔
699
    goto end;
628✔
700
  }
701
  SArray* pBufArray = NULL;
1,623✔
702
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
1,623✔
703
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
1,623✔
704
  if (TSDB_CODE_SUCCESS != code) goto end;
1,623✔
705
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1,623✔
706
  pQuery->msgType = TDMT_VND_DROP_TABLE;
1,623✔
707
  pQuery->stableQuery = false;
1,623✔
708
  pQuery->pRoot = NULL;
1,623✔
709
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
1,623✔
710
  if (TSDB_CODE_SUCCESS != code) goto end;
1,623✔
711
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
1,623✔
712

713
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,623✔
714
  if (pRequest->code == TSDB_CODE_SUCCESS) {
1,623✔
715
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
1,623✔
716
  }
717
  code = pRequest->code;
1,623✔
718
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,623✔
719

720
end:
2,251✔
721
  taosHashCleanup(pVgroupHashmap);
2,251✔
722
  destroyRequest(pRequest);
2,251✔
723
  tDecoderClear(&coder);
2,251✔
724
  qDestroyQuery(pQuery);
2,251✔
725
  RAW_LOG_END
2,251✔
726
  return code;
2,251✔
727
}
728

729
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
952✔
730
  if (taos == NULL || meta == NULL) {
952✔
UNCOV
731
    uError("invalid parameter in %s", __func__);
×
732
    return TSDB_CODE_INVALID_PARA;
×
733
  }
734
  SDeleteRes req = {0};
952✔
735
  SDecoder   coder = {0};
952✔
736
  char       sql[256] = {0};
952✔
737
  int32_t    code = TSDB_CODE_SUCCESS;
952✔
738
  int32_t    lino = 0;
952✔
739
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
952✔
740

741
  // decode and process req
742
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
952✔
743
  uint32_t len = metaLen - sizeof(SMsgHead);
952✔
744
  tDecoderInit(&coder, data, len);
952✔
745
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
952✔
746
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
952✔
747
                 req.tsColName, req.skey, req.tsColName, req.ekey);
748

749
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
952✔
750
  RAW_NULL_CHECK(res);
952✔
751
  SRequestObj* pRequest = (SRequestObj*)res;
952✔
752
  code = pRequest->code;
952✔
753
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
952✔
754
    code = TSDB_CODE_SUCCESS;
314✔
755
  }
756
  taos_free_result(res);
952✔
757
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
952✔
758

759
end:
952✔
760
  RAW_LOG_END
952✔
761
  tDecoderClear(&coder);
952✔
762
  return code;
952✔
763
}
764

765
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
15,853✔
766
  if (taos == NULL || meta == NULL) {
15,853✔
UNCOV
767
    uError("invalid parameter in %s", __func__);
×
768
    return TSDB_CODE_INVALID_PARA;
×
769
  }
770
  SVAlterTbReq   req = {0};
15,853✔
771
  SDecoder       dcoder = {0};
15,853✔
772
  int32_t        code = TSDB_CODE_SUCCESS;
15,853✔
773
  int32_t        lino = 0;
15,853✔
774
  SRequestObj*   pRequest = NULL;
15,853✔
775
  SQuery*        pQuery = NULL;
15,853✔
776
  SArray*        pArray = NULL;
15,853✔
777
  SVgDataBlocks* pVgData = NULL;
15,853✔
778
  SArray*        pVgList = NULL;
15,853✔
779
  SEncoder       coder = {0};
15,853✔
780
  SHashObj*      pVgroupHashmap = NULL;
15,853✔
781

782
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
15,853✔
783
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
15,853✔
784
  pRequest->syncQuery = true;
15,853✔
785
  if (!pRequest->pDb) {
15,853✔
UNCOV
786
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
787
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
788
    goto end;
×
789
  }
790
  // decode and process req
791
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
15,853✔
792
  uint32_t len = metaLen - sizeof(SMsgHead);
15,853✔
793
  tDecoderInit(&dcoder, data, len);
15,853✔
794
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&dcoder, &req));
15,853✔
795
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
796
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
15,853✔
797
    uInfo(LOG_ID_TAG " alter table action is UPDATE_OPTIONS, ignore", LOG_ID_VALUE);
1,278✔
798
    goto end;
1,278✔
799
  }
800

801
  STscObj*  pTscObj = pRequest->pTscObj;
14,575✔
802
  SCatalog* pCatalog = NULL;
14,575✔
803
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
14,575✔
804
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
14,575✔
805
                           .requestId = pRequest->requestId,
14,575✔
806
                           .requestObjRefId = pRequest->self,
14,575✔
807
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
14,575✔
808

809
  // Handle Type 1 batch modification with vnode grouping
810
  if (req.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
14,575✔
811
    if (req.tables == NULL || taosArrayGetSize(req.tables) == 0) {
4,627✔
UNCOV
812
      uError(LOG_ID_TAG " Type 1 batch alter has empty tables array", LOG_ID_VALUE);
×
813
      code = TSDB_CODE_INVALID_PARA;
×
814
      goto end;
×
815
    }
816

817
    int32_t nTables = taosArrayGetSize(req.tables);
4,627✔
818
    uDebug(LOG_ID_TAG " Type 1 batch alter with %d tables, grouping by vnode", LOG_ID_VALUE, nTables);
4,627✔
819

820
    // Create hashmap to group tables by vgId
821
    pVgroupHashmap = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
4,627✔
822
    RAW_NULL_CHECK(pVgroupHashmap);
4,627✔
823

824
    // Group tables by vnode
825
    for (int32_t i = 0; i < nTables; i++) {
10,024✔
826
      SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
5,397✔
827
      if (pTable == NULL || pTable->tbName == NULL) {
5,397✔
UNCOV
828
        uWarn(LOG_ID_TAG " Type 1 batch alter table[%d] has invalid name, skip", LOG_ID_VALUE, i);
×
829
        continue;
×
830
      }
831

832
      // Query vnode for this table
833
      SVgroupInfo vgInfo = {0};
5,397✔
834
      SName pName = {0};
5,397✔
835
      toName(pTscObj->acctId, pRequest->pDb, pTable->tbName, &pName);
5,397✔
836
      code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgInfo);
5,397✔
837
      PROCESS_TABLE_NOT_EXIST(code, pTable->tbName)
5,397✔
838

839
      // Add table to corresponding vnode's array
840
      SArray** ppTables = taosHashGet(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t));
5,397✔
841
      if (ppTables == NULL) {
5,397✔
842
        SArray* pTables = taosArrayInit(16, sizeof(SUpdateTableTagVal));
5,012✔
843
        RAW_NULL_CHECK(pTables);
5,012✔
844
        RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t), &pTables, sizeof(void*)));
5,012✔
845
        ppTables = taosHashGet(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t));
5,012✔
846
      }
847

848
      SArray* pTables = *ppTables;
5,397✔
849
      RAW_NULL_CHECK(taosArrayPush(pTables, pTable));
5,397✔
850
    }
851

852
    // Build and send separate request for each vnode
853
    pArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
4,627✔
854
    RAW_NULL_CHECK(pArray);
4,627✔
855

856
    void* pIter = taosHashIterate(pVgroupHashmap, NULL);
4,627✔
857
    while (pIter) {
9,639✔
858
      size_t keyLen = 0;
5,012✔
859
      int32_t* pVgId = taosHashGetKey(pIter, &keyLen);
5,012✔
860
      SArray* pTables = *(SArray**)pIter;
5,012✔
861
      int32_t nTablesInVg = taosArrayGetSize(pTables);
5,012✔
862

863
      uDebug(LOG_ID_TAG " Type 1 batch alter: vgId:%d has %d tables", LOG_ID_VALUE, *pVgId, nTablesInVg);
5,012✔
864

865
      // Build SVAlterTbReq for this vnode
866
      SVAlterTbReq vgReq = {0};
5,012✔
867
      vgReq.action = req.action;
5,012✔
868
      vgReq.tbName = req.tbName;
5,012✔
869
      vgReq.source = TD_REQ_FROM_TAOX;
5,012✔
870
      vgReq.tables = pTables;
5,012✔
871

872
      // Encode request
873
      int tlen = 0;
5,012✔
874
      tEncodeSize(tEncodeSVAlterTbReq, &vgReq, tlen, code);
5,012✔
875
      if (code < 0) {
5,012✔
UNCOV
876
        uError(LOG_ID_TAG " Type 1 batch alter encode failed for vgId:%d, code:%s",
×
877
               LOG_ID_VALUE, *pVgId, tstrerror(code));
UNCOV
878
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
879
        goto end;
×
880
      }
881

882
      tlen += sizeof(SMsgHead);
5,012✔
883
      void* pMsg = taosMemoryMalloc(tlen);
5,012✔
884
      if (pMsg == NULL) {
5,012✔
UNCOV
885
        code = terrno;
×
886
        uError(LOG_ID_TAG " Type 1 batch alter malloc failed for vgId:%d, size:%d",
×
887
               LOG_ID_VALUE, *pVgId, tlen);
UNCOV
888
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
889
        goto end;
×
890
      }
891

892
      ((SMsgHead*)pMsg)->vgId = htonl(*pVgId);
5,012✔
893
      ((SMsgHead*)pMsg)->contLen = htonl(tlen);
5,012✔
894
      void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
5,012✔
895

896
      SEncoder vgCoder = {0};
5,012✔
897
      tEncoderInit(&vgCoder, pBuf, tlen - sizeof(SMsgHead));
5,012✔
898
      code = tEncodeSVAlterTbReq(&vgCoder, &vgReq);
5,012✔
899
      tEncoderClear(&vgCoder);
5,012✔
900

901
      if (code < 0) {
5,012✔
UNCOV
902
        uError(LOG_ID_TAG " Type 1 batch alter encode2 failed for vgId:%d, code:%s",
×
903
               LOG_ID_VALUE, *pVgId, tstrerror(code));
UNCOV
904
        taosMemoryFree(pMsg);
×
905
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
906
        goto end;
×
907
      }
908

909
      // Query vgroup info for first table to get endpoint
910
      SUpdateTableTagVal* pFirstTable = taosArrayGet(pTables, 0);
5,012✔
911
      SVgroupInfo vgInfo = {0};
5,012✔
912
      SName pName = {0};
5,012✔
913
      toName(pTscObj->acctId, pRequest->pDb, pFirstTable->tbName, &pName);
5,012✔
914
      code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgInfo);
5,012✔
915
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
5,012✔
UNCOV
916
        taosMemoryFree(pMsg);
×
917
        pIter = taosHashIterate(pVgroupHashmap, pIter);
×
918
        code = TSDB_CODE_SUCCESS;
×
919
        continue;
×
920
      }
921
      if (code != TSDB_CODE_SUCCESS) {
5,012✔
UNCOV
922
        taosMemoryFree(pMsg);
×
923
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
924
        goto end;
×
925
      }
926

927
      // Create VgDataBlocks for this vnode
928
      pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
5,012✔
929
      if (pVgData == NULL) {
5,012✔
UNCOV
930
        code = terrno;
×
931
        taosMemoryFree(pMsg);
×
932
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
933
        goto end;
×
934
      }
935
      pVgData->vg = vgInfo;
5,012✔
936
      pVgData->pData = pMsg;
5,012✔
937
      pVgData->size = tlen;
5,012✔
938
      pVgData->numOfTables = nTablesInVg;
5,012✔
939

940
      if (taosArrayPush(pArray, &pVgData) == NULL) {
5,012✔
UNCOV
941
        code = terrno;
×
942
        taosMemoryFree(pMsg);
×
943
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
944
        goto end;
×
945
      }
946

947
      pVgData = NULL;  // Ownership transferred to pArray
5,012✔
948
      pIter = taosHashIterate(pVgroupHashmap, pIter);
5,012✔
949
    }
950

951
    uInfo(LOG_ID_TAG " Type 1 batch alter: grouped %d tables into %d vnodes",
4,627✔
952
          LOG_ID_VALUE, nTables, (int32_t)taosArrayGetSize(pArray));
953
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
9,948✔
954
    char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,756✔
955
    SName pName = {TSDB_TABLE_NAME_T, pTscObj->acctId, {0}, {0}};
1,756✔
956
    tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
1,756✔
957
    (void)tNameGetFullDbName(&pName, dbFName);
1,756✔
958
    RAW_RETURN_CHECK(catalogGetDBVgList(pCatalog, &conn, dbFName, &pVgList));
1,756✔
959

960
    pArray = taosArrayInit(taosArrayGetSize(pVgList), sizeof(void*));
1,756✔
961
    RAW_NULL_CHECK(pArray);
1,756✔
962
    for (int i = 0; i < taosArrayGetSize(pVgList); ++i) {
4,282✔
963
      SVgroupInfo* pInfo = (SVgroupInfo*)taosArrayGet(pVgList, i);
2,526✔
964
      pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
2,526✔
965
      RAW_NULL_CHECK(pVgData);
2,526✔
966
      pVgData->vg = *pInfo;
2,526✔
967

968
      int tlen = 0;
2,526✔
969
      req.source = TD_REQ_FROM_TAOX;
2,526✔
970

971
      tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
2,526✔
972
      RAW_RETURN_CHECK(code);
2,526✔
973
      tlen += sizeof(SMsgHead);
2,526✔
974
      void* pMsg = taosMemoryMalloc(tlen);
2,526✔
975
      RAW_NULL_CHECK(pMsg);
2,526✔
976
      ((SMsgHead*)pMsg)->vgId = htonl(pInfo->vgId);
2,526✔
977
      ((SMsgHead*)pMsg)->contLen = htonl(tlen);
2,526✔
978
      void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
2,526✔
979
      tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
2,526✔
980
      RAW_RETURN_CHECK(tEncodeSVAlterTbReq(&coder, &req));
2,526✔
981
      tEncoderClear(&coder);
2,526✔
982

983
      pVgData->pData = pMsg;
2,526✔
984
      pVgData->size = tlen;
2,526✔
985

986
      pVgData->numOfTables = 1;
2,526✔
987
      RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
2,526✔
988
      pVgData = NULL;  // Ownership transferred to pArray
2,526✔
989
    }
990
  } else {
991
    // Single table or Type 2 modification - original logic
992
    SVgroupInfo pInfo = {0};
8,192✔
993
    SName       pName = {0};
8,192✔
994
    toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
8,192✔
995
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
8,192✔
996
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
8,192✔
UNCOV
997
      code = TSDB_CODE_SUCCESS;
×
998
      goto end;
×
999
    }
1000
    RAW_RETURN_CHECK(code);
8,192✔
1001
    pArray = taosArrayInit(1, sizeof(void*));
8,192✔
1002
    RAW_NULL_CHECK(pArray);
8,192✔
1003

1004
    pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
8,192✔
1005
    RAW_NULL_CHECK(pVgData);
8,192✔
1006
    pVgData->vg = pInfo;
8,192✔
1007

1008
    int tlen = 0;
8,192✔
1009
    req.source = TD_REQ_FROM_TAOX;
8,192✔
1010

1011
    if (strlen(tmqWriteRefDB) > 0) {
8,192✔
1012
      req.refDbName = tmqWriteRefDB;
3,080✔
1013
    }
1014

1015
    if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF && tmqWriteCheckRef) {
8,192✔
1016
      RAW_RETURN_CHECK(checkColRefForAlter(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName,
385✔
1017
        pRequest->pDb, req.tbName, req.colName));
1018
    }else if (req.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF && tmqWriteCheckRef) {
7,807✔
UNCOV
1019
      RAW_RETURN_CHECK(checkColRefForAdd(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName,
×
1020
        pRequest->pDb, req.tbName, req.colName, req.type, req.bytes, req.typeMod));
1021
    }
1022

1023
    tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
8,192✔
1024
    RAW_RETURN_CHECK(code);
8,192✔
1025
    tlen += sizeof(SMsgHead);
8,192✔
1026
    void* pMsg = taosMemoryMalloc(tlen);
8,192✔
1027
    RAW_NULL_CHECK(pMsg);
8,192✔
1028
    ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
8,192✔
1029
    ((SMsgHead*)pMsg)->contLen = htonl(tlen);
8,192✔
1030
    void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
8,192✔
1031
    tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
8,192✔
1032
    RAW_RETURN_CHECK(tEncodeSVAlterTbReq(&coder, &req));
8,192✔
1033

1034
    pVgData->pData = pMsg;
8,192✔
1035
    pVgData->size = tlen;
8,192✔
1036

1037
    pVgData->numOfTables = 1;
8,192✔
1038
    RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
8,192✔
1039
    pVgData = NULL;
8,192✔
1040
  }
1041

1042
  pQuery = NULL;
14,575✔
1043
  RAW_RETURN_CHECK(nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery));
14,575✔
1044
  if (NULL == pQuery) goto end;
14,575✔
1045
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
14,575✔
1046
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
14,575✔
1047
  pQuery->stableQuery = false;
14,575✔
1048
  pQuery->pRoot = NULL;
14,575✔
1049
  RAW_RETURN_CHECK(nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot));
14,575✔
1050
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
14,575✔
1051

1052
  launchQueryImpl(pRequest, pQuery, true, NULL);
14,575✔
1053
  pArray = NULL;
14,575✔
1054

1055
  code = pRequest->code;
14,575✔
1056
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST || code == TSDB_CODE_NOT_FOUND) {
14,575✔
1057
    code = TSDB_CODE_SUCCESS;
1,027✔
1058
  }
1059

1060
  if (pRequest->code == TSDB_CODE_SUCCESS) {
14,575✔
1061
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
13,548✔
1062
    if (pRes->res != NULL) {
13,548✔
1063
      code = handleAlterTbExecRes(pRes->res, pCatalog);
8,192✔
1064
    }
1065
  }
1066
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
14,575✔
1067

1068
end:
15,853✔
1069
  // Cleanup vnode grouping hashmap
1070
  if (pVgroupHashmap != NULL) {
15,853✔
1071
    void* pIter = taosHashIterate(pVgroupHashmap, NULL);
4,627✔
1072
    while (pIter) {
9,639✔
1073
      SArray* pTables = *(SArray**)pIter;
5,012✔
1074
      taosArrayDestroy(pTables);
5,012✔
1075
      pIter = taosHashIterate(pVgroupHashmap, pIter);
5,012✔
1076
    }
1077
    taosHashCleanup(pVgroupHashmap);
4,627✔
1078
  }
1079

1080
  for (int i = 0; i < taosArrayGetSize(pArray); ++i) {
15,853✔
UNCOV
1081
    SVgDataBlocks* pData = (SVgDataBlocks*)taosArrayGetP(pArray, i);
×
1082
    if (pData && pData->pData) {
×
1083
      taosMemoryFreeClear(pData->pData);
×
1084
      taosMemoryFreeClear(pData);
×
1085
    }
1086
  }
1087
  taosArrayDestroy(pArray);
15,853✔
1088
  
1089
  if (pVgData) {
15,853✔
UNCOV
1090
    taosMemoryFreeClear(pVgData->pData);
×
1091
    taosMemoryFreeClear(pVgData);
×
1092
  }
1093
  taosArrayDestroy(pVgList);
15,853✔
1094
  destroyRequest(pRequest);
15,853✔
1095
  tDecoderClear(&dcoder);
15,853✔
1096
  qDestroyQuery(pQuery);
15,853✔
1097
  destroyAlterTbReq(&req);
15,853✔
1098
  tEncoderClear(&coder);
15,853✔
1099
  RAW_LOG_END
15,853✔
1100
  return code;
15,853✔
1101
}
1102

1103
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
314✔
1104
                                     int numFields) {
1105
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
314✔
1106
}
1107

1108
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
2,512✔
1109
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1110
  if (taos == NULL || pData == NULL || tbname == NULL) {
2,512✔
UNCOV
1111
    uError("invalid parameter in %s, taos:%p, pData:%p, tbname:%p", __func__, taos, pData, tbname);
×
1112
    return TSDB_CODE_INVALID_PARA;
×
1113
  }
1114
  int32_t     code = TSDB_CODE_SUCCESS;
2,512✔
1115
  int32_t     lino = 0;
2,512✔
1116
  STableMeta* pTableMeta = NULL;
2,512✔
1117
  SQuery*     pQuery = NULL;
2,512✔
1118
  SHashObj*   pVgHash = NULL;
2,512✔
1119

1120
  SRequestObj* pRequest = NULL;
2,512✔
1121
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
2,512✔
1122

1123
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
2,512✔
1124
         rows, pData, tbname, fields, numFields);
1125

1126
  pRequest->syncQuery = true;
2,512✔
1127
  if (!pRequest->pDb) {
2,512✔
UNCOV
1128
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1129
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1130
    goto end;
×
1131
  }
1132

1133
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
2,512✔
1134
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
2,512✔
1135
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
2,512✔
1136

1137
  struct SCatalog* pCatalog = NULL;
2,512✔
1138
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,512✔
1139

1140
  SRequestConnInfo conn = {0};
2,512✔
1141
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
2,512✔
1142
  conn.requestId = pRequest->requestId;
2,512✔
1143
  conn.requestObjRefId = pRequest->self;
2,512✔
1144
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
2,512✔
1145

1146
  SVgroupInfo vgData = {0};
2,512✔
1147
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
2,512✔
1148
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
2,512✔
1149
  uDebug(LOG_ID_TAG " write raw block got meta, tbname:%s, numOfColumns:%d", LOG_ID_VALUE,
2,198✔
1150
         tbname, pTableMeta->tableInfo.numOfColumns);
1151
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
2,198✔
1152
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
2,198✔
1153
  RAW_NULL_CHECK(pVgHash);
2,198✔
1154
  RAW_RETURN_CHECK(
2,198✔
1155
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1156
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
2,198✔
1157
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,570✔
1158

1159
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,570✔
1160
  code = pRequest->code;
1,570✔
1161
  uDebug(LOG_ID_TAG " write raw block return, tbname:%s, msg:%s", LOG_ID_VALUE, tbname, tstrerror(code));
1,570✔
1162

1163
end:
2,512✔
1164
  taosMemoryFreeClear(pTableMeta);
2,512✔
1165
  qDestroyQuery(pQuery);
2,512✔
1166
  destroyRequest(pRequest);
2,512✔
1167
  taosHashCleanup(pVgHash);
2,512✔
1168
  RAW_LOG_END
2,512✔
1169
  return code;
2,512✔
1170
}
1171

1172
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
2,198✔
1173
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, NULL, 0, 0);
2,198✔
1174
}
1175

UNCOV
1176
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
×
1177
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, NULL, 0, reqid);
×
1178
}
1179

1180
static void* getRawDataFromRes(void* pRetrieve) {
54,915✔
1181
  if (pRetrieve == NULL) {
54,915✔
UNCOV
1182
    uError("invalid parameter in %s", __func__);
×
1183
    return NULL;
×
1184
  }
1185
  void* rawData = NULL;
54,915✔
1186
  // deal with compatibility
1187
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
54,915✔
UNCOV
1188
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1189
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
54,915✔
UNCOV
1190
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
1191
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
54,915✔
1192
  }
1193
  return rawData;
54,915✔
1194
}
1195

1196
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
3,252✔
1197
  if (rsp == NULL || pHashObj == NULL) {
3,252✔
UNCOV
1198
    uError("invalid parameter in %s", __func__);
×
1199
    return TSDB_CODE_INVALID_PARA;
×
1200
  }
1201
  // find schema data info
1202
  int32_t       code = 0;
3,252✔
1203
  int32_t       lino = 0;
3,252✔
1204
  SVCreateTbReq pCreateReq = {0};
3,252✔
1205
  SDecoder      decoderTmp = {0};
3,252✔
1206
  RAW_LOG_START
3,252✔
1207
  for (int j = 0; j < rsp->createTableNum; j++) {
8,487✔
1208
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
5,235✔
1209
    RAW_NULL_CHECK(dataTmp);
5,235✔
1210
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
5,235✔
1211
    RAW_NULL_CHECK(lenTmp);
5,235✔
1212

1213
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
5,235✔
1214
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
5,235✔
1215

1216
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
5,235✔
UNCOV
1217
      uError("invalid table type %d in %s", pCreateReq.type, __func__);
×
1218
      code = TSDB_CODE_INVALID_MSG;
×
1219
      goto end;
×
1220
    }
1221
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
5,235✔
1222
      RAW_RETURN_CHECK(
5,235✔
1223
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1224
    } else {
UNCOV
1225
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1226
    }
1227

1228
    tDecoderClear(&decoderTmp);
5,235✔
1229
    pCreateReq = (SVCreateTbReq){0};
5,235✔
1230
  }
1231

1232
end:
3,252✔
1233
  tDecoderClear(&decoderTmp);
3,252✔
1234
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
3,252✔
1235
  RAW_LOG_END
3,252✔
1236
  return code;
3,252✔
1237
}
1238

1239
typedef enum {
1240
  WRITE_RAW_INIT_START = 0,
1241
  WRITE_RAW_INIT_OK,
1242
  WRITE_RAW_INIT_FAIL,
1243
} WRITE_RAW_INIT_STATUS;
1244

1245
static SHashObj* writeRawCache = NULL;
1246
static int8_t    initFlag = 0;
1247
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1248

1249
typedef struct {
1250
  SHashObj* pVgHash;
1251
  SHashObj* pNameHash;
1252
  SHashObj* pMetaHash;
1253
} rawCacheInfo;
1254

1255
typedef struct {
1256
  SVgroupInfo vgInfo;
1257
  int64_t     uid;
1258
  int64_t     suid;
1259
} tbInfo;
1260

1261
static void tmqFreeMeta(void* data) {
15,061✔
1262
  if (data == NULL) {
15,061✔
UNCOV
1263
    uError("invalid parameter in %s", __func__);
×
1264
    return;
×
1265
  }
1266
  STableMeta* pTableMeta = *(STableMeta**)data;
15,061✔
1267
  taosMemoryFree(pTableMeta);
15,061✔
1268
}
1269

UNCOV
1270
static void freeRawCache(void* data) {
×
1271
  if (data == NULL) {
×
1272
    uError("invalid parameter in %s", __func__);
×
1273
    return;
×
1274
  }
UNCOV
1275
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1276
  taosHashCleanup(pRawCache->pMetaHash);
×
1277
  taosHashCleanup(pRawCache->pNameHash);
×
1278
  taosHashCleanup(pRawCache->pVgHash);
×
1279
}
1280

1281
static int32_t initRawCacheHash() {
7,503✔
1282
  if (writeRawCache == NULL) {
7,503✔
1283
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
7,503✔
1284
    if (writeRawCache == NULL) {
7,503✔
UNCOV
1285
      return terrno;
×
1286
    }
1287
    taosHashSetFreeFp(writeRawCache, freeRawCache);
7,503✔
1288
  }
1289
  return 0;
7,503✔
1290
}
1291

1292
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
4,123✔
1293
  if (rawData == NULL || pSW == NULL) {
4,123✔
UNCOV
1294
    return false;
×
1295
  }
1296
  if (pTableMeta == NULL) {
4,123✔
UNCOV
1297
    uError("invalid parameter in %s", __func__);
×
1298
    return false;
×
1299
  }
1300
  char* p = (char*)rawData;
4,123✔
1301
  // Raw block header layout:
1302
  // | version(int32) | totalLen(int32) | totalRows(int32) | blankFill(int32) | totalCols(int32) | flagSeg(uint64) | columnSchema... | colLen... |
1303
  int32_t rawBlockHeaderSize = sizeof(int32_t) * 5 + sizeof(uint64_t);
4,123✔
1304
  p += rawBlockHeaderSize;
4,123✔
1305
  int8_t* fields = (int8_t*)p;
4,123✔
1306

1307
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
4,123✔
1308
    return true;
1,581✔
1309
  }
1310

1311
  for (int i = 0; i < pSW->nCols; i++) {
13,348✔
1312
    int j = 0;
10,806✔
1313
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
28,610✔
1314
      SSchema*    pColSchema = &pTableMeta->schema[j];
28,610✔
1315
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
28,610✔
1316
      char*       fieldName = pSW->pSchema[i].name;
28,610✔
1317

1318
      if (strcmp(pColSchema->name, fieldName) == 0) {
28,610✔
1319
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
10,806✔
UNCOV
1320
          return true;
×
1321
        }
1322
        break;
10,806✔
1323
      }
1324
    }
1325
    fields += sizeof(int8_t) + sizeof(int32_t);
10,806✔
1326

1327
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
10,806✔
1328
  }
1329
  return false;
2,542✔
1330
}
1331

1332
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
24,255✔
1333
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
24,255✔
UNCOV
1334
    uError("invalid parameter in %s", __func__);
×
1335
    return TSDB_CODE_INVALID_PARA;
×
1336
  }
1337
  int32_t code = 0;
24,255✔
1338
  int32_t lino = 0;
24,255✔
1339
  RAW_LOG_START
24,255✔
1340
  void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
24,255✔
1341
  if (cacheInfo == NULL) {
24,255✔
1342
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
24,255✔
1343
    RAW_NULL_CHECK(*pVgHash);
24,255✔
1344
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
24,255✔
1345
    RAW_NULL_CHECK(*pNameHash);
24,255✔
1346
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
24,255✔
1347
    RAW_NULL_CHECK(*pMetaHash);
24,255✔
1348
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
24,255✔
1349
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
24,255✔
1350
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
24,255✔
1351
  } else {
UNCOV
1352
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
1353
    *pVgHash = info->pVgHash;
×
1354
    *pNameHash = info->pNameHash;
×
1355
    *pMetaHash = info->pMetaHash;
×
1356
  }
1357

1358
end:
24,255✔
1359
  if (code != 0) {
24,255✔
UNCOV
1360
    taosHashCleanup(*pMetaHash);
×
1361
    taosHashCleanup(*pNameHash);
×
1362
    taosHashCleanup(*pVgHash);
×
1363
  }
1364
  RAW_LOG_END
24,255✔
1365
  return code;
24,255✔
1366
}
1367

1368
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
24,255✔
1369
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
24,255✔
UNCOV
1370
    uError("invalid parameter in %s", __func__);
×
1371
    return TSDB_CODE_INVALID_PARA;
×
1372
  }
1373
  int32_t code = 0;
24,255✔
1374
  int32_t lino = 0;
24,255✔
1375
  RAW_LOG_START
24,255✔
1376
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
24,255✔
1377
  (*pRequest)->syncQuery = true;
24,255✔
1378
  if (!(*pRequest)->pDb) {
24,255✔
UNCOV
1379
    uError("%s no database selected", __func__);
×
1380
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1381
    goto end;
×
1382
  }
1383

1384
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
24,255✔
1385
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
24,255✔
1386
  conn->requestId = (*pRequest)->requestId;
24,255✔
1387
  conn->requestObjRefId = (*pRequest)->self;
24,255✔
1388
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
24,255✔
1389

1390
end:
24,255✔
1391
  RAW_LOG_END
24,255✔
1392
  return code;
24,255✔
1393
}
1394

1395
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
1396
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
24,255✔
1397
                              SMqRspObj* rspObj) {
1398
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
24,255✔
UNCOV
1399
    uError("invalid parameter in %s", __func__);
×
1400
    return TSDB_CODE_INVALID_PARA;
×
1401
  }
1402
  int8_t dataVersion = *(int8_t*)data;
24,255✔
1403
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
24,255✔
1404
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
24,255✔
1405
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
24,255✔
UNCOV
1406
      return TSDB_CODE_INVALID_PARA;
×
1407
    }
1408
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
24,255✔
1409
  }
1410

1411
  rspObj->resIter = -1;
24,255✔
1412
  tDecoderInit(decoder, data, dataLen);
24,255✔
1413
  int32_t code = func(decoder, &rspObj->dataRsp);
24,255✔
1414
  if (code != 0) {
24,255✔
UNCOV
1415
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
1416
  }
1417
  return code;
24,255✔
1418
}
1419

1420
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
54,915✔
1421
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
1422
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
1423
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
54,915✔
1424
      pMeta == NULL) {
UNCOV
1425
    uError("invalid parameter in %s", __func__);
×
1426
    return TSDB_CODE_INVALID_PARA;
×
1427
  }
1428
  int32_t code = 0;
54,915✔
1429
  int32_t lino = 0;
54,915✔
1430
  RAW_LOG_START
54,915✔
1431
  STableMeta* pTableMeta = NULL;
54,915✔
1432
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
54,915✔
1433
  uDebug("%s tname:%s, cacheHit:%d, retry:%d, hasCreateReq:%d", __func__,
54,915✔
1434
         pName->tname, tmpInfo != NULL, retry, pCreateReqDst != NULL);
1435
  if (tmpInfo == NULL || retry > 0) {
54,915✔
1436
    tbInfo info = {0};
50,792✔
1437

1438
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
50,792✔
1439
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
50,792✔
1440
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
5,235✔
1441
    }
1442
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
50,792✔
1443
    info.uid = pTableMeta->uid;
49,222✔
1444
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
49,222✔
1445
      info.suid = pTableMeta->suid;
37,728✔
1446
    } else {
1447
      info.suid = pTableMeta->uid;
11,494✔
1448
    }
1449
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
49,222✔
1450
    RAW_RETURN_CHECK(code);
49,222✔
1451

1452
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid,
49,222✔
1453
           taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
1454
    if (pCreateReqDst) {
49,222✔
1455
      pTableMeta->vgId = info.vgInfo.vgId;
5,235✔
1456
      pTableMeta->uid = pCreateReqDst->uid;
5,235✔
1457
      pCreateReqDst->ctb.suid = pTableMeta->suid;
5,235✔
1458
    }
1459

1460
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
49,222✔
1461
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
49,222✔
1462
    RAW_RETURN_CHECK(
49,222✔
1463
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
1464
  }
1465

1466
  if (pTableMeta == NULL || retry > 0) {
53,345✔
1467
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
4,123✔
1468
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
4,123✔
1469
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
1,581✔
1470
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
1,581✔
1471
      RAW_RETURN_CHECK(code);
1,581✔
1472
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d",
1,581✔
1473
             tmpInfo->suid, taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
1474
    } else {
1475
      pTableMeta = *pTableMetaTmp;
2,542✔
1476
      pTableMeta->uid = tmpInfo->uid;
2,542✔
1477
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
2,542✔
1478
    }
1479
  }
1480
  *pMeta = pTableMeta;
53,345✔
1481
  pTableMeta = NULL;
53,345✔
1482

1483
end:
54,915✔
1484
  taosMemoryFree(pTableMeta);
54,915✔
1485
  RAW_LOG_END
54,915✔
1486
  return code;
54,915✔
1487
}
1488

1489
static int32_t tmqWriteRawCommon(TAOS* taos, void* data, uint32_t dataLen,
24,255✔
1490
                                 _raw_decode_func_ decodeFunc, bool withMeta) {
1491
  if (taos == NULL || data == NULL) {
24,255✔
UNCOV
1492
    uError("invalid parameter in %s, taos:%p, data:%p, withMeta:%d", __func__, taos, data, withMeta);
×
1493
    return TSDB_CODE_INVALID_PARA;
×
1494
  }
1495
  int32_t   code = TSDB_CODE_SUCCESS;
24,255✔
1496
  int32_t   lino = 0;
24,255✔
1497
  SQuery*   pQuery = NULL;
24,255✔
1498
  SMqRspObj rspObj = {0};
24,255✔
1499
  SDecoder  decoder = {0};
24,255✔
1500
  SHashObj* pCreateTbHash = NULL;
24,255✔
1501

1502
  SRequestObj*     pRequest = NULL;
24,255✔
1503
  SCatalog*        pCatalog = NULL;
24,255✔
1504
  SRequestConnInfo conn = {0};
24,255✔
1505
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
24,255✔
1506
  uDebug(LOG_ID_TAG " write raw %s, data:%p, dataLen:%d", LOG_ID_VALUE, withMeta ? "metadata" : "data", data, dataLen);
24,255✔
1507
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, decodeFunc, &rspObj));
24,255✔
1508

1509
  if (withMeta) {
24,255✔
1510
    pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
3,252✔
1511
    RAW_NULL_CHECK(pCreateTbHash);
3,252✔
1512
    RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
3,252✔
1513
  }
1514

1515
  SHashObj* pVgHash = NULL;
24,255✔
1516
  SHashObj* pNameHash = NULL;
24,255✔
1517
  SHashObj* pMetaHash = NULL;
24,255✔
1518
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
24,255✔
1519
  int retry = 0;
24,255✔
1520
  while (1) {
1521
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
24,255✔
1522
    uDebug(LOG_ID_TAG " write raw %s block num:%d, retry:%d", LOG_ID_VALUE,
24,255✔
1523
           withMeta ? "metadata" : "data", rspObj.dataRsp.blockNum, retry);
1524
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
79,170✔
1525
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
54,915✔
1526
      RAW_NULL_CHECK(tbName);
54,915✔
1527
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
54,915✔
1528
      RAW_NULL_CHECK(pSW);
54,915✔
1529
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
54,915✔
1530
      RAW_NULL_CHECK(pRetrieve);
54,915✔
1531
      void* rawData = getRawDataFromRes(pRetrieve);
54,915✔
1532
      RAW_NULL_CHECK(rawData);
54,915✔
1533

1534
      uTrace(LOG_ID_TAG " write raw %s block[%d] tbname:%s, schemaCols:%d", LOG_ID_VALUE,
54,915✔
1535
             withMeta ? "metadata" : "data", rspObj.resIter, tbName, pSW->nCols);
1536
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
54,915✔
1537
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
54,915✔
1538
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
54,915✔
1539

1540
      SVCreateTbReq* pCreateReqDst = NULL;
54,915✔
1541
      if (pCreateTbHash) {
54,915✔
1542
        pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
8,748✔
1543
      }
1544
      STableMeta* pTableMeta = NULL;
54,915✔
1545
      code = processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
54,915✔
1546
                              &pTableMeta, pSW, rawData, retry);
1547
      PROCESS_TABLE_NOT_EXIST(code, tbName)
54,915✔
1548
      char err[ERR_MSG_LEN] = {0};
53,345✔
1549
      code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
53,345✔
1550
      if (code != TSDB_CODE_SUCCESS) {
53,345✔
UNCOV
1551
        uError(LOG_ID_TAG " rawBlockBindData failed, table:%s, err:%s, code:%s", LOG_ID_VALUE, pName.tname, err, tstrerror(code));
×
1552
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
1553
        goto end;
×
1554
      }
1555
    }
1556
    if (taosHashGetSize(pVgHash) == 0) {
24,255✔
1557
      goto end;
1,256✔
1558
    }
1559
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
22,999✔
1560
    launchQueryImpl(pRequest, pQuery, true, NULL);
22,999✔
1561
    code = pRequest->code;
22,999✔
1562

1563
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
22,999✔
UNCOV
1564
      uInfo(LOG_ID_TAG " write raw retry:%d/3, code:%d, msg:%s", LOG_ID_VALUE, retry, code, tstrerror(code));
×
1565
      qDestroyQuery(pQuery);
×
1566
      pQuery = NULL;
×
1567
      rspObj.resIter = -1;
×
1568
      continue;
×
1569
    }
1570
    break;
22,999✔
1571
  }
1572
  uDebug(LOG_ID_TAG " write raw %s return, msg:%s", LOG_ID_VALUE, withMeta ? "metadata" : "data", tstrerror(code));
22,999✔
1573

1574
end:
24,255✔
1575
  if (withMeta) {
24,255✔
1576
    tDeleteSTaosxRsp(&rspObj.dataRsp);
3,252✔
1577
    void* pIter = taosHashIterate(pCreateTbHash, NULL);
3,252✔
1578
    while (pIter) {
8,487✔
1579
      tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
5,235✔
1580
      pIter = taosHashIterate(pCreateTbHash, pIter);
5,235✔
1581
    }
1582
    taosHashCleanup(pCreateTbHash);
3,252✔
1583
  } else {
1584
    tDeleteMqDataRsp(&rspObj.dataRsp);
21,003✔
1585
  }
1586
  tDecoderClear(&decoder);
24,255✔
1587
  qDestroyQuery(pQuery);
24,255✔
1588
  destroyRequest(pRequest);
24,255✔
1589
  RAW_LOG_END
24,255✔
1590
  return code;
24,255✔
1591
}
1592

1593
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
21,003✔
1594
  return tmqWriteRawCommon(taos, data, dataLen, tDecodeMqDataRsp, false);
21,003✔
1595
}
1596

1597
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
3,252✔
1598
  return tmqWriteRawCommon(taos, data, dataLen, tDecodeSTaosxRsp, true);
3,252✔
1599
}
1600

UNCOV
1601
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
1602
  if (taos == NULL || data == NULL) {
×
1603
    uError("invalid parameter in %s", __func__);
×
1604
    return TSDB_CODE_INVALID_PARA;
×
1605
  }
UNCOV
1606
  int32_t   code = TSDB_CODE_SUCCESS;
×
1607
  int32_t   lino = 0;
×
1608
  SQuery*   pQuery = NULL;
×
1609
  SHashObj* pVgroupHash = NULL;
×
1610
  SMqRspObj rspObj = {0};
×
1611
  SDecoder  decoder = {0};
×
1612

UNCOV
1613
  SRequestObj*     pRequest = NULL;
×
1614
  SCatalog*        pCatalog = NULL;
×
1615
  SRequestConnInfo conn = {0};
×
1616

UNCOV
1617
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
1618
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
1619
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
1620

UNCOV
1621
  SHashObj* pVgHash = NULL;
×
1622
  SHashObj* pNameHash = NULL;
×
1623
  SHashObj* pMetaHash = NULL;
×
1624
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
1625
  int retry = 0;
×
1626
  while (1) {
×
1627
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
1628
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
1629
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
1630
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
1631
    RAW_NULL_CHECK(pVgroupHash);
×
1632
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
1633
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
1634

UNCOV
1635
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
1636
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
1637
      RAW_NULL_CHECK(tbName);
×
1638
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
1639
      RAW_NULL_CHECK(pRetrieve);
×
1640
      void* rawData = getRawDataFromRes(pRetrieve);
×
1641
      RAW_NULL_CHECK(rawData);
×
1642

UNCOV
1643
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
1644
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
1645
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
1646
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
1647

1648
      // find schema data info
UNCOV
1649
      STableMeta* pTableMeta = NULL;
×
1650
      code = processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
1651
                                        NULL, retry);
UNCOV
1652
      PROCESS_TABLE_NOT_EXIST(code, tbName)
×
1653
      char err[ERR_MSG_LEN] = {0};
×
1654
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
1655
      if (code != TSDB_CODE_SUCCESS) {
×
1656
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
1657
        goto end;
×
1658
      }
1659
    }
UNCOV
1660
    taosHashCleanup(pVgroupHash);
×
1661
    pVgroupHash = NULL;
×
1662

UNCOV
1663
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
1664
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
1665
    code = pRequest->code;
×
1666

UNCOV
1667
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
1668
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
1669
      qDestroyQuery(pQuery);
×
1670
      pQuery = NULL;
×
1671
      rspObj.resIter = -1;
×
1672
      continue;
×
1673
    }
UNCOV
1674
    break;
×
1675
  }
UNCOV
1676
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1677

UNCOV
1678
end:
×
1679
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
1680
  tDecoderClear(&decoder);
×
1681
  qDestroyQuery(pQuery);
×
1682
  taosHashCleanup(pVgroupHash);
×
1683
  destroyRequest(pRequest);
×
1684
  RAW_LOG_END
×
1685
  return code;
×
1686
}
1687

1688
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
24,303✔
1689
  if (pRsp == NULL) {
24,303✔
UNCOV
1690
    uError("invalid parameter in %s", __func__);
×
1691
    return TSDB_CODE_INVALID_PARA;
×
1692
  }
1693
  int32_t pos = 0;
24,303✔
1694
  int32_t code = 0;
24,303✔
1695
  int32_t lino = 0;
24,303✔
1696
  RAW_LOG_START
24,303✔
1697
  SEncoder coder = {0};
24,303✔
1698
  tEncoderInit(&coder, NULL, 0);
24,303✔
1699
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset));
24,303✔
1700
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset));
24,303✔
1701
  pos = coder.pos;
24,303✔
1702
  tEncoderClear(&coder);
24,303✔
1703

1704
end:
24,303✔
1705
  if (code != 0) {
24,303✔
UNCOV
1706
    uError("getOffSetLen failed, code:%d", code);
×
1707
    return code;
×
1708
  } else {
1709
    uDebug("getOffSetLen success, len:%d", pos);
24,303✔
1710
    return pos;
24,303✔
1711
  }
1712
}
1713

1714
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
1715
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
24,303✔
1716
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
24,303✔
UNCOV
1717
    uError("invalid parameter in %s", __func__);
×
1718
    return TSDB_CODE_INVALID_PARA;
×
1719
  }
1720
  uint32_t len = 0;
24,303✔
1721
  int32_t  code = 0;
24,303✔
1722
  int32_t  lino = 0;
24,303✔
1723
  SEncoder encoder = {0};
24,303✔
1724
  void*    buf = NULL;
24,303✔
1725
  tEncodeSize(encodeFunc, rspObj, len, code);
24,303✔
1726
  RAW_FALSE_CHECK(code >= 0);
24,303✔
1727
  len += sizeof(int8_t) + sizeof(int32_t);
24,303✔
1728
  buf = taosMemoryCalloc(1, len);
24,303✔
1729
  RAW_NULL_CHECK(buf);
24,303✔
1730
  tEncoderInit(&encoder, buf, len);
24,303✔
1731
  RAW_RETURN_CHECK(tEncodeI8(&encoder, MQ_DATA_RSP_VERSION));
24,303✔
1732
  int32_t offsetLen = getOffSetLen(rspObj);
24,303✔
1733
  RAW_FALSE_CHECK(offsetLen > 0);
24,303✔
1734
  RAW_RETURN_CHECK(tEncodeI32(&encoder, offsetLen));
24,303✔
1735
  RAW_RETURN_CHECK(encodeFunc(&encoder, rspObj));
24,303✔
1736

1737
  raw->raw = buf;
24,303✔
1738
  buf = NULL;
24,303✔
1739
  raw->raw_len = len;
24,303✔
1740

1741
end:
24,303✔
1742
  RAW_LOG_END
24,303✔
1743
  return code;
24,303✔
1744
}
1745

1746
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
108,899✔
1747
  if (raw == NULL || res == NULL) {
108,899✔
UNCOV
1748
    uError("invalid parameter in %s", __func__);
×
1749
    return TSDB_CODE_INVALID_PARA;
×
1750
  }
1751
  int32_t code = TSDB_CODE_SUCCESS;
108,899✔
1752
  int32_t lino = 0;
108,899✔
1753
  RAW_LOG_START
108,899✔
1754
  *raw = (tmq_raw_data){0};
108,899✔
1755
  SMqRspObj* rspObj = ((SMqRspObj*)res);
108,899✔
1756
  uDebug("tmq_get_raw resType:%d", rspObj->resType);
108,899✔
1757
  if (TD_RES_TMQ_META(res)) {
108,899✔
1758
    raw->raw = rspObj->metaRsp.metaRsp;
79,015✔
1759
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
79,015✔
1760
    raw->raw_type = rspObj->metaRsp.resMsgType;
79,015✔
1761
    uDebug("tmq get raw type meta:%p", raw);
79,015✔
1762
  } else if (TD_RES_TMQ(res)) {
29,884✔
1763
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw));
21,051✔
1764
    raw->raw_type = RES_TYPE__TMQ;
21,051✔
1765
    uDebug("tmq get raw type data:%p", raw);
21,051✔
1766
  } else if (TD_RES_TMQ_METADATA(res)) {
8,833✔
1767
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw));
3,252✔
1768
    raw->raw_type = RES_TYPE__TMQ_METADATA;
3,252✔
1769
    uDebug("tmq get raw type metadata:%p", raw);
3,252✔
1770
  } else if (TD_RES_TMQ_BATCH_META(res)) {
5,581✔
1771
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
5,581✔
1772
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
5,581✔
1773
    raw->raw_type = rspObj->resType;
5,581✔
1774
    uDebug("tmq get raw batch meta:%p", raw);
5,581✔
UNCOV
1775
  } else if (TD_RES_TMQ_RAW(res)) {
×
1776
    raw->raw = rspObj->dataRsp.rawData;
×
1777
    rspObj->dataRsp.rawData = NULL;
×
1778
    raw->raw_len = rspObj->dataRsp.len;
×
1779
    raw->raw_type = rspObj->resType;
×
1780
    uDebug("tmq get raw raw:%p", raw);
×
1781
  } else {
UNCOV
1782
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
1783
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
1784
  }
1785

1786
end:
108,899✔
1787
  RAW_LOG_END
108,899✔
1788
  return code;
108,899✔
1789
}
1790

1791
void tmq_free_raw(tmq_raw_data raw) {
108,851✔
1792
  uDebug("tmq free raw data type:%d", raw.raw_type);
108,851✔
1793
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
108,851✔
1794
    taosMemoryFree(raw.raw);
24,255✔
1795
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
84,596✔
UNCOV
1796
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
1797
  }
1798
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
108,851✔
1799
}
108,851✔
1800

1801
static int32_t writeRawInit() {
149,348✔
1802
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
156,851✔
1803
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
7,503✔
1804
    if (old == 0) {
7,503✔
1805
      int32_t code = initRawCacheHash();
7,503✔
1806
      if (code != 0) {
7,503✔
UNCOV
1807
        uError("tmq writeRawImpl init error:%d", code);
×
1808
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
1809
        return code;
×
1810
      }
1811
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
7,503✔
1812
    }
1813
  }
1814

1815
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
149,348✔
UNCOV
1816
    return TSDB_CODE_INTERNAL_ERROR;
×
1817
  }
1818
  return 0;
149,348✔
1819
}
1820

1821
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
149,348✔
1822
  if (taos == NULL || buf == NULL) {
149,348✔
UNCOV
1823
    uError("invalid parameter in %s", __func__);
×
1824
    return TSDB_CODE_INVALID_PARA;
×
1825
  }
1826
  if (writeRawInit() != 0) {
149,348✔
UNCOV
1827
    return TSDB_CODE_INTERNAL_ERROR;
×
1828
  }
1829

1830
  uDebug("writeRawImpl type:%d, buf:%p, len:%d", type, buf, len);
149,348✔
1831
  switch (type) {
149,348✔
1832
    case TDMT_VND_CREATE_STB:
41,582✔
1833
    case TDMT_VND_ALTER_STB:
1834
      return taosCreateStb(taos, buf, len);
41,582✔
1835
    case TDMT_VND_DROP_STB:
2,536✔
1836
      return taosDropStb(taos, buf, len);
2,536✔
1837
    case TDMT_VND_CREATE_TABLE:
56,338✔
1838
      return taosCreateTable(taos, buf, len);
56,338✔
1839
    case TDMT_VND_ALTER_TABLE:
15,853✔
1840
      return taosAlterTable(taos, buf, len);
15,853✔
1841
    case TDMT_VND_DROP_TABLE:
2,251✔
1842
      return taosDropTable(taos, buf, len);
2,251✔
1843
    case TDMT_VND_DELETE:
952✔
1844
      return taosDeleteData(taos, buf, len);
952✔
1845
    case RES_TYPE__TMQ_METADATA:
3,252✔
1846
      return tmqWriteRawMetaDataImpl(taos, buf, len);
3,252✔
UNCOV
1847
    case RES_TYPE__TMQ_RAWDATA:
×
1848
      return tmqWriteRawRawDataImpl(taos, buf, len);
×
1849
    case RES_TYPE__TMQ:
21,003✔
1850
      return tmqWriteRawDataImpl(taos, buf, len);
21,003✔
1851
    case RES_TYPE__TMQ_BATCH_META:
5,581✔
1852
      return tmqWriteBatchMetaDataImpl(taos, buf, len);
5,581✔
UNCOV
1853
    default:
×
1854
      uError("writeRawImpl unknown type:%d", type);
×
1855
      return TSDB_CODE_INVALID_PARA;
×
1856
  }
1857
}
1858

1859
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
104,107✔
1860
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
104,107✔
1861
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
3,154✔
1862
    return TSDB_CODE_INVALID_PARA;
3,154✔
1863
  }
1864
  taosClearErrMsg();  // clear global error message
100,953✔
1865
  uDebug("tmq_write_raw connId:0x%" PRIx64 ", raw_type:%d, raw_len:%d", *(int64_t*)taos, raw.raw_type, raw.raw_len);
100,953✔
1866

1867
  int32_t code = writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
100,953✔
1868
  if (code != TSDB_CODE_SUCCESS) {
100,953✔
UNCOV
1869
    uError("tmq_write_raw connId:0x%" PRIx64 " failed, raw_type:%d, code:%s", *(int64_t*)taos, raw.raw_type, tstrerror(code));
×
1870
  }
1871
  return code;
100,953✔
1872
}
1873

1874
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
5,581✔
1875
  if (taos == NULL || meta == NULL) {
5,581✔
UNCOV
1876
    uError("invalid parameter in %s", __func__);
×
1877
    return TSDB_CODE_INVALID_PARA;
×
1878
  }
1879
  SMqBatchMetaRsp rsp = {0};
5,581✔
1880
  SDecoder        coder = {0};
5,581✔
1881
  SDecoder        metaCoder = {0};
5,581✔
1882
  int32_t         code = TSDB_CODE_SUCCESS;
5,581✔
1883
  int32_t         lino = 0;
5,581✔
1884

1885
  RAW_LOG_START
5,581✔
1886
  // decode and process req
1887
  tDecoderInit(&coder, meta, metaLen);
5,581✔
1888
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
5,581✔
1889
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
5,581✔
1890
  uDebug("%s batch meta count:%d, metaLen:%d", __func__, num, metaLen);
5,581✔
1891
  for (int32_t i = 0; i < num; i++) {
53,976✔
1892
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
48,395✔
1893
    RAW_NULL_CHECK(len);
48,395✔
1894
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
48,395✔
1895
    RAW_NULL_CHECK(tmpBuf);
48,395✔
1896
    SMqMetaRsp metaRsp = {0};
48,395✔
1897
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
48,395✔
1898
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
48,395✔
1899
    uDebug("%s processing batch item %d/%d, resMsgType:%d, metaRspLen:%d", __func__, i, num,
48,395✔
1900
           metaRsp.resMsgType, metaRsp.metaRspLen);
1901
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
48,395✔
1902
    tDeleteMqMetaRsp(&metaRsp);
48,395✔
1903
    tDecoderClear(&metaCoder);
48,395✔
1904
    if (code != TSDB_CODE_SUCCESS) {
48,395✔
UNCOV
1905
      uError("%s batch item %d/%d failed, resMsgType:%d, code:%s", __func__, i, num,
×
1906
             metaRsp.resMsgType, tstrerror(code));
UNCOV
1907
      goto end;
×
1908
    }
1909
  }
1910

1911
end:
5,581✔
1912
  tDecoderClear(&coder);
5,581✔
1913
  tDecoderClear(&metaCoder);
5,581✔
1914
  tDeleteMqBatchMetaRsp(&rsp);
5,581✔
1915
  RAW_LOG_END
5,581✔
1916
  return code;
5,581✔
1917
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc