• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

statuscompliance / status-backend / 20368835235

19 Dec 2025 11:35AM UTC coverage: 91.547% (+0.6%) from 90.985%
20368835235

push

github

alvarobernal2412
test(controllers): increase linker and databinder coverage

1505 of 1729 branches covered (87.04%)

Branch coverage included in aggregate %.

2827 of 3003 relevant lines covered (94.14%)

28.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.19
/src/controllers/linker.controller.js
1
import { models } from '../models/models.js';
2
import { getDatabinderCatalog } from '../config/databinder.js';
3
import logger from '../config/logger.js';
4
import { 
5
  generateInstanceId, 
6
  extractResultData,
7
  applyPropertyMapping,
8
  generateCorrelationIds,
9
  createTelemetryContext,
10
  normalizeName,
11
  sanitizeLinker,
12
  checkLinkerOwnership,
13
  validateLinkerInput,
14
  validateLinkerUpdateInput,
15
  validateDatasourcesExist,
16
  validateDatasourceConfigs,
17
  generateLinkerExecutionId,
18
  mergeDatasourceResults,
19
  createLinkerExecutionMetadata,
20
  normalizeDatasourceConfigs,
21
  createLinkerExecutionSummary,
22
  cacheLinkerExecution,
23
  getCachedLinkerExecution,
24
  invalidateLinkerCache
25
} from '../utils/databinder/index.js';
26

27
// Get the initialized DatasourceCatalog
28
const datasourceCatalog = getDatabinderCatalog();
78✔
29

30
/**
31
 * Internal function to execute linker and fetch data from all datasources
32
 * This is used both when creating a linker and when executing it via /execute endpoint
33
 * @param {Object} linker - The linker instance
34
 * @param {number} userId - The user ID
35
 * @param {string} executionId - Execution ID for logging
36
 * @param {string} traceId - Trace ID for telemetry
37
 * @param {string} spanId - Span ID for telemetry
38
 * @param {Object} options - Options to pass to datasource methods
39
 * @returns {Promise<Object>} Execution result with status, data, and metadata
40
 */
41
const executeLinkerInternal = async (linker, userId, executionId, traceId, spanId, options = {}) => {
78!
42
  logger.debug(`[${executionId}] Starting internal linker execution`, {
1✔
43
    linkerId: linker.id,
44
    linkerName: linker.name,
45
    datasourceCount: linker.datasourceIds.length,
46
    userId,
47
    traceId,
48
    spanId
49
  });
50

51
  const executionStartTime = Date.now();
1✔
52

53
  // Fetch all datasources
54
  const datasources = await models.Datasource.findAll({
1✔
55
    where: {
56
      id: linker.datasourceIds,
57
      ownerId: userId
58
    }
59
  });
60

61
  if (datasources.length !== linker.datasourceIds.length) {
1!
62
    const foundIds = new Set(datasources.map(ds => ds.id));
1✔
63
    const missingIds = linker.datasourceIds.filter(id => !foundIds.has(id));
2✔
64
    
65
    throw new Error(`Some datasources are no longer available. Missing: ${missingIds.join(', ')}`);
1✔
66
  }
67

68
  // Execute each datasource
69
  const results = [];
×
70
  
71
  for (const datasource of datasources) {
×
72
    try {
×
73
      const dsConfig = linker.datasourceConfigs?.[datasource.id];
×
74
      
75
      // Determine method name: runtime options > datasource config > linker default
76
      let methodName = options?.methodName || dsConfig?.methodConfig?.methodName || linker.defaultMethodName;
×
77
      
78
      // Merge options: runtime options override config options
79
      const configOptions = dsConfig?.methodConfig?.options || {};
×
80
      const methodOptions = { ...configOptions, ...options };
×
81
      
82
      // Remove methodName from options if present (it's not a method parameter)
83
      delete methodOptions.methodName;
×
84
      
85
      const propertyMapping = dsConfig?.propertyMapping || null;
×
86

87
      // Create datasource instance
88
      const instanceId = generateInstanceId(datasource.id, Date.now(), 'linker');
×
89
      const instance = datasourceCatalog.createDatasourceInstance(
×
90
        datasource.definitionId,
91
        datasource.config,
92
        instanceId
93
      );
94

95
      // Check if method exists
96
      if (!instance.methods[methodName]) {
×
97
        const availableMethods = instance.listMethods ? instance.listMethods() : Object.keys(instance.methods);
×
98
        results.push({
×
99
          datasourceId: datasource.id,
100
          datasourceName: datasource.name,
101
          success: false,
102
          error: `Method '${methodName}' not available. Available methods: ${availableMethods.join(', ')}`,
103
          data: null
104
        });
105
        continue;
×
106
      }
107

108
      // Execute method
109
      const callStartTime = Date.now();
×
110
      const result = await instance.methods[methodName](methodOptions);
×
111
      const callEndTime = Date.now();
×
112

113
      // Extract and transform data
114
      const extractedResult = extractResultData(result);
×
115
      const finalResult = propertyMapping ? applyPropertyMapping(extractedResult, propertyMapping) : extractedResult;
×
116

117
      results.push({
×
118
        datasourceId: datasource.id,
119
        datasourceName: datasource.name,
120
        definitionId: datasource.definitionId,
121
        methodUsed: methodName,
122
        success: true,
123
        error: null,
124
        data: finalResult,
125
        executionDuration: `${callEndTime - callStartTime}ms`,
126
        propertyMappingApplied: !!propertyMapping
127
      });
128

129
    } catch (error) {
130
      logger.error(`Error executing datasource ${datasource.id} in linker ${linker.id}:`, error);
×
131
      results.push({
×
132
        datasourceId: datasource.id,
133
        datasourceName: datasource.name,
134
        success: false,
135
        error: error.message,
136
        data: null
137
      });
138
    }
139
  }
140

141
  // Determine overall execution status
142
  const allSuccessful = results.every(r => r.success);
×
143
  const anySuccessful = results.some(r => r.success);
×
144
  
145
  let overallStatus;
146
  if (allSuccessful) {
×
147
    overallStatus = 'success';
×
148
  } else if (anySuccessful) {
×
149
    overallStatus = 'success';
×
150
  } else {
151
    overallStatus = 'failure';
×
152
  }
153

154
  const executionEndTime = Date.now();
×
155

156
  // Always use 'indexed' merge strategy for caching
157
  const mergedData = mergeDatasourceResults(results, 'indexed');
×
158

159
  // Create execution metadata
160
  const executionMetadata = createLinkerExecutionMetadata({
×
161
    linkerId: linker.id,
162
    datasourceIds: linker.datasourceIds,
163
    executionId,
164
    startTime: executionStartTime,
165
    endTime: executionEndTime,
166
    results
167
  });
168

169
  // Create execution summary
170
  const executionSummary = createLinkerExecutionSummary(results);
×
171

172
  logger.debug(`[${executionId}] Internal linker execution completed`, {
×
173
    executionStatus: overallStatus,
174
    executionDuration: `${executionEndTime - executionStartTime}ms`,
175
    successfulDatasources: executionSummary.successful,
176
    failedDatasources: executionSummary.failed,
177
    traceId,
178
    spanId
179
  });
180

181
  return {
×
182
    overallStatus,
183
    mergedData,
184
    executionMetadata,
185
    executionSummary,
186
    detailedResults: results,
187
    executionStartTime,
188
    executionEndTime
189
  };
190
};
191

192
export const listLinkers = async (req, res) => {
78✔
193
  try {
4✔
194
    const userId = req.user?.user_id;
4✔
195
    if (!userId) return res.status(401).json({ message: 'Unauthorized' });
4✔
196

197
    const linkers = await models.Linker.findAll({
3✔
198
      where: { ownerId: userId },
199
    });
200

201
    const sanitized = linkers.map((linker) => sanitizeLinker(linker));
2✔
202
    res.json(sanitized);
2✔
203
  } catch (error) {
204
    logger.error('Error listing linkers:', error);
1✔
205
    res.status(500).json({ message: 'Error listing linkers', error: error.message });
1✔
206
  }
207
};
208

209
export const getLinker = async (req, res) => {
78✔
210
  try {
4✔
211
    const userId = req.user?.user_id;
4✔
212
    const { id } = req.params;
4✔
213

214
    const linker = await models.Linker.findByPk(id);
4✔
215

216
    if (!checkLinkerOwnership(linker, userId)) {
3✔
217
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
218
    }
219

220
    res.json(sanitizeLinker(linker, true));
1✔
221
  } catch (error) {
222
    logger.error('Error fetching linker:', error);
1✔
223
    res.status(500).json({ message: 'Error fetching linker', error: error.message });
1✔
224
  }
225
};
226

227
export const createLinker = async (req, res) => {
78✔
228
  try {
5✔
229
    const userId = req.user?.user_id;
5✔
230
    if (!userId) return res.status(401).json({ message: 'Unauthorized' });
5✔
231

232
    const { name, defaultMethodName, datasourceIds, datasourceConfigs, description, environment } = req.body;
4✔
233

234
    // Validate input
235
    const validation = validateLinkerInput({ datasourceIds, defaultMethodName, datasourceConfigs });
4✔
236
    if (!validation.isValid) {
4✔
237
      return res.status(400).json({ error: validation.errors.join(', ') });
3✔
238
    }
239

240
    // Validate that all datasource IDs exist and belong to the user
241
    const dsValidation = await validateDatasourcesExist(datasourceIds, models.Datasource, userId);
1✔
242
    if (!dsValidation.isValid) {
1!
243
      return res.status(400).json({ error: dsValidation.errors.join(', ') });
×
244
    }
245

246
    // Validate datasource configs structure (now mandatory)
247
    const configValidation = validateDatasourceConfigs(datasourceConfigs, datasourceIds);
1✔
248
    if (!configValidation.isValid) {
1!
249
      return res.status(400).json({ error: configValidation.errors.join(', ') });
×
250
    }
251

252
    // Normalize and check for existing linker with same name
253
    const normalizedName = name ? normalizeName(name) : null;
1!
254
    if (normalizedName) {
1!
255
      const existing = await models.Linker.findOne({
1✔
256
        where: { name: normalizedName, ownerId: userId }
257
      });
258
      if (existing) {
1!
259
        return res.status(409).json({ message: 'A linker with this name already exists.' });
1✔
260
      }
261
    }
262

263
    // Normalize datasource configs
264
    const normalizedConfigs = normalizeDatasourceConfigs(datasourceConfigs, datasourceIds);
×
265

266
    // Create linker
267
    const linkerData = {
×
268
      name: normalizedName,
269
      defaultMethodName: defaultMethodName || 'default',
×
270
      datasourceIds,
271
      datasourceConfigs: normalizedConfigs, // Always required, never null
272
      description: description || null,
×
273
      environment: environment || 'production',
×
274
      isActive: true,
275
      createdBy: req.user.username,
276
      version: 1,
277
      ownerId: userId,
278
    };
279

280
    const newLinker = await models.Linker.create(linkerData);
×
281

282
    // Execute linker immediately after creation and cache the results
283
    try {
×
284
      const { traceId, spanId } = generateCorrelationIds();
×
285
      const executionId = generateLinkerExecutionId(newLinker.id, Date.now());
×
286
      
287
      logger.info(`Executing linker immediately after creation: ${newLinker.id}`);
×
288
      
289
      const executionResult = await executeLinkerInternal(
×
290
        newLinker,
291
        userId,
292
        executionId,
293
        traceId,
294
        spanId,
295
        {}
296
      );
297

298
      // Update linker execution status
299
      await newLinker.update({
×
300
        executionStatus: executionResult.overallStatus,
301
        lastExecutedAt: new Date()
302
      });
303

304
      // Cache the execution result with 2 weeks expiration
305
      await cacheLinkerExecution(newLinker.id, executionResult.mergedData, {
×
306
        executionId,
307
        executionStatus: executionResult.overallStatus,
308
        executionSummary: executionResult.executionSummary,
309
        traceId,
310
        spanId
311
      });
312

313
      logger.info(`Linker ${newLinker.id} executed and cached successfully`, {
×
314
        linkerId: newLinker.id,
315
        executionStatus: executionResult.overallStatus
316
      });
317

318
    } catch (execError) {
319
      // Log error but don't fail the creation
320
      logger.error(`Error executing linker after creation: ${newLinker.id}`, execError);
×
321
      await newLinker.update({
×
322
        executionStatus: 'failure',
323
        lastExecutedAt: new Date()
324
      });
325
    }
326

327
    const response = sanitizeLinker(newLinker, true);
×
328
    res.status(201).json({ 
×
329
      message: 'Linker created successfully',
330
      datasourceCount: datasourceIds.length,
331
      ...response 
332
    });
333
  } catch (error) {
334
    logger.error('Error creating linker:', error);
×
335
    res.status(500).json({ message: 'Error creating linker', error: error.message });
×
336
  }
337
};
338

339
/**
340
 * Validate and prepare name update
341
 * @param {string} name - New name
342
 * @param {Object} linker - Current linker
343
 * @param {number} userId - User ID
344
 * @param {string} linkerId - Linker ID
345
 * @returns {Promise<Object>} Result with normalized name or error
346
 */
347
const validateNameUpdate = async (name, linker, userId, linkerId) => {
78✔
348
  const normalizedName = normalizeName(name);
3✔
349
  
350
  if (normalizedName === linker.name) {
3✔
351
    return { normalizedName, isValid: true };
1✔
352
  }
353

354
  const existing = await models.Linker.findOne({
2✔
355
    where: { name: normalizedName, ownerId: userId }
356
  });
357

358
  if (existing && existing.id !== linkerId) {
2✔
359
    return {
1✔
360
      isValid: false,
361
      error: { status: 409, message: 'A linker with this name already exists.' }
362
    };
363
  }
364

365
  return { normalizedName, isValid: true };
1✔
366
};
367

368
/**
369
 * Process datasource IDs update
370
 * @param {Array<string>} datasourceIds - New datasource IDs
371
 * @param {Object} linker - Current linker
372
 * @param {number} userId - User ID
373
 * @returns {Promise<Object>} Update data and cache invalidation flag
374
 */
375
const processDatasourceIdsUpdate = async (datasourceIds, linker, userId) => {
78✔
376
  const dsValidation = await validateDatasourcesExist(datasourceIds, models.Datasource, userId);
4✔
377
  
378
  if (!dsValidation.isValid) {
4✔
379
    return {
2✔
380
      isValid: false,
381
      error: { status: 400, message: dsValidation.errors.join(', ') }
382
    };
383
  }
384

385
  return {
2✔
386
    isValid: true,
387
    updateData: {
388
      datasourceIds,
389
      version: linker.version + 1,
390
      executionStatus: 'not_executed'
391
    },
392
    shouldInvalidateCache: true
393
  };
394
};
395

396
/**
397
 * Process datasource configs update
398
 * @param {Object} datasourceConfigs - New datasource configs
399
 * @param {Array<string>} targetDatasourceIds - Target datasource IDs
400
 * @param {Object} linker - Current linker
401
 * @param {Object} currentUpdateData - Current update data
402
 * @returns {Object} Update data and cache invalidation flag
403
 */
404
const processDatasourceConfigsUpdate = (datasourceConfigs, targetDatasourceIds, linker, currentUpdateData) => {
78✔
405
  const configValidation = validateDatasourceConfigs(datasourceConfigs, targetDatasourceIds);
2✔
406
  
407
  if (!configValidation.isValid) {
2✔
408
    return {
1✔
409
      isValid: false,
410
      error: { status: 400, message: configValidation.errors.join(', ') }
411
    };
412
  }
413

414
  const updateData = {
1✔
415
    datasourceConfigs: normalizeDatasourceConfigs(datasourceConfigs, targetDatasourceIds)
416
  };
417

418
  if (currentUpdateData.version === undefined) {
1!
419
    updateData.version = linker.version + 1;
1✔
420
  }
421

422
  return {
1✔
423
    isValid: true,
424
    updateData,
425
    shouldInvalidateCache: true
426
  };
427
};
428

429
/**
430
 * Build update data from request body
431
 * @param {Object} body - Request body
432
 * @param {Object} linker - Current linker
433
 * @param {number} userId - User ID
434
 * @param {string} linkerId - Linker ID
435
 * @returns {Promise<Object>} Update data and flags
436
 */
437
const buildUpdateData = async (body, linker, userId, linkerId) => {
78✔
438
  const { name, defaultMethodName, datasourceIds, datasourceConfigs, description, environment, isActive } = body;
15✔
439
  const updateData = {};
15✔
440
  let shouldInvalidateCache = false;
15✔
441

442
  // Handle name update
443
  if (name !== undefined) {
15✔
444
    const nameResult = await validateNameUpdate(name, linker, userId, linkerId);
3✔
445
    if (!nameResult.isValid) {
3✔
446
      return nameResult;
1✔
447
    }
448
    updateData.name = nameResult.normalizedName;
2✔
449
  }
450

451
  // Handle defaultMethodName update
452
  if (defaultMethodName !== undefined) {
14✔
453
    updateData.defaultMethodName = defaultMethodName;
1✔
454
  }
455

456
  // Handle datasourceIds update
457
  if (datasourceIds !== undefined) {
14✔
458
    const dsResult = await processDatasourceIdsUpdate(datasourceIds, linker, userId);
4✔
459
    if (!dsResult.isValid) {
4✔
460
      return dsResult;
2✔
461
    }
462
    Object.assign(updateData, dsResult.updateData);
2✔
463
    shouldInvalidateCache = dsResult.shouldInvalidateCache;
2✔
464
  }
465

466
  // Handle datasourceConfigs update
467
  if (datasourceConfigs !== undefined) {
12✔
468
    const targetDatasourceIds = datasourceIds || linker.datasourceIds;    
3✔
469
    // datasourceConfigs cannot be null or undefined
470
    if (datasourceConfigs === null) {
3✔
471
      return {
1✔
472
        isValid: false,
473
        error: 'datasourceConfigs is required and cannot be null'
474
      };
475
    }
476
    const configResult = processDatasourceConfigsUpdate(datasourceConfigs, targetDatasourceIds, linker, updateData);
2✔
477
    if (!configResult.isValid) {
2✔
478
      return configResult;
1✔
479
    }
480
    Object.assign(updateData, configResult.updateData);
1✔
481
    shouldInvalidateCache = shouldInvalidateCache || configResult.shouldInvalidateCache;
1✔
482
  }
483

484
  // Handle simple fields
485
  if (description !== undefined) {
10✔
486
    updateData.description = description;
1✔
487
  }
488

489
  if (environment !== undefined) {
10✔
490
    updateData.environment = environment;
1✔
491
  }
492

493
  if (isActive !== undefined) {
10✔
494
    updateData.isActive = Boolean(isActive);
1✔
495
  }
496

497
  return {
10✔
498
    isValid: true,
499
    updateData,
500
    shouldInvalidateCache
501
  };
502
};
503

504
export const updateLinker = async (req, res) => {
78✔
505
  try {
17✔
506
    const userId = req.user?.user_id;
17✔
507
    const { id } = req.params;
17✔
508

509
    const linker = await models.Linker.findByPk(id);
17✔
510
    if (!checkLinkerOwnership(linker, userId)) {
17✔
511
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
512
    }
513

514
    // Validate input
515
    const validation = validateLinkerUpdateInput(req.body);
15✔
516
    if (!validation.isValid) {
15!
517
      return res.status(400).json({ error: validation.errors.join(', ') });
×
518
    }
519

520
    // Build update data
521
    const result = await buildUpdateData(req.body, linker, userId, id);
15✔
522
    if (!result.isValid) {
15✔
523
      return res.status(result.error.status).json({ message: result.error.message });
5✔
524
    }
525

526
    if (Object.keys(result.updateData).length === 0) {
10✔
527
      return res.status(400).json({ message: 'No valid fields provided for update.' });
1✔
528
    }
529

530
    result.updateData.updatedAt = new Date();
9✔
531

532
    await linker.update(result.updateData);
9✔
533

534
    // Invalidate cache if needed
535
    if (result.shouldInvalidateCache) {
9✔
536
      await invalidateLinkerCache(linker.id);
3✔
537
      logger.info(`Invalidated cache for linker ${linker.id} due to configuration changes`);
3✔
538
    }
539

540
    res.json({ 
9✔
541
      message: 'Linker updated successfully', 
542
      ...sanitizeLinker(linker, true) 
543
    });
544
  } catch (error) {
545
    logger.error('Error updating linker:', error);
×
546
    res.status(500).json({ message: 'Error updating linker', error: error.message });
×
547
  }
548
};
549

550
export const deleteLinker = async (req, res) => {
78✔
551
  try {
4✔
552
    const userId = req.user?.user_id;
4✔
553
    const { id } = req.params;
4✔
554

555
    const linker = await models.Linker.findByPk(id);
4✔
556
    if (!checkLinkerOwnership(linker, userId)) {
3✔
557
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
558
    }
559

560
    // Invalidate cache before deleting
561
    await invalidateLinkerCache(linker.id);
1✔
562
    logger.info(`Invalidated cache for linker ${linker.id} before deletion`);
1✔
563

564
    await linker.destroy();
1✔
565
    res.status(204).send();
1✔
566
  } catch (error) {
567
    logger.error('Error deleting linker:', error);
1✔
568
    res.status(500).json({ message: 'Error deleting linker', error: error.message });
1✔
569
  }
570
};
571

572
export const executeLinker = async (req, res) => {
78✔
573
  // Generate trace-like IDs for correlation
574
  const { traceId, spanId } = generateCorrelationIds();
3✔
575
  
576
  try {
3✔
577
    const userId = req.user?.user_id;
3✔
578
    const { id } = req.params;
3✔
579
    const { options = {}, mergeStrategy = 'indexed' } = req.body;
3✔
580

581
    const linker = await models.Linker.findByPk(id);
3✔
582
    if (!checkLinkerOwnership(linker, userId)) {
3✔
583
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
584
    }
585

586
    // Capture execution start time
587
    const executionStartTime = Date.now();
1✔
588
    const executionId = generateLinkerExecutionId(linker.id, executionStartTime);
1✔
589

590
    logger.debug(`[${executionId}] Starting linker execution`, {
1✔
591
      linkerId: linker.id,
592
      linkerName: linker.name,
593
      datasourceCount: linker.datasourceIds.length,
594
      userId,
595
      requestId: req.requestId || 'unknown',
2✔
596
      traceId,
597
      spanId,
598
      mergeStrategy
599
    });
600

601
    // Check cache first
602
    const cachedResult = await getCachedLinkerExecution(linker.id);
1✔
603
    
604
    // If cache is valid (exists and not stale - less than 1 hour old), use it
605
    if (cachedResult.data && !cachedResult.isStale) {
1!
606
      logger.info(`Using cached data for linker ${linker.id}`, {
×
607
        linkerId: linker.id,
608
        cacheAge: cachedResult.cacheAge,
609
        cacheAgeMinutes: Math.floor(cachedResult.cacheAge / 60000)
610
      });
611

612
      // Always use indexed merge strategy for cached data
613
      const cachedMergedData = cachedResult.data;
×
614

615
      // Create telemetry context
616
      const telemetryContext = createTelemetryContext({
×
617
        traceId,
618
        spanId,
619
        operationName: `linker.execute.cached.${linker.id}`,
620
        correlationId: executionId
621
      });
622

623
      return res.json({
×
624
        message: 'Linker executed from cache',
625
        linkerId: linker.id,
626
        linkerName: linker.name,
627
        executionStatus: 'success',
628
        mergeStrategy: 'indexed',
629
        mergedData: cachedMergedData,
630
        fromCache: true,
631
        cacheAge: cachedResult.cacheAge,
632
        cacheAgeMinutes: Math.floor(cachedResult.cacheAge / 60000),
633
        cachedMetadata: cachedResult.metadata,
634
        telemetryContext
635
      });
636
    }
637

638
    // If cache is stale or doesn't exist, delete old cache and execute fresh
639
    if (cachedResult.data && cachedResult.isStale) {
1!
640
      logger.info(`Cache is stale for linker ${linker.id}, invalidating and re-executing`, {
×
641
        linkerId: linker.id,
642
        cacheAge: cachedResult.cacheAge,
643
        cacheAgeMinutes: Math.floor(cachedResult.cacheAge / 60000)
644
      });
645
      await invalidateLinkerCache(linker.id);
×
646
    }
647

648
    // Update execution status to pending
649
    await linker.update({ 
1✔
650
      executionStatus: 'pending',
651
      lastExecutedAt: new Date()
652
    });
653

654
    // Execute linker using internal function
655
    const executionResult = await executeLinkerInternal(
1✔
656
      linker,
657
      userId,
658
      executionId,
659
      traceId,
660
      spanId,
661
      options
662
    );
663

664
    // Update linker execution status
665
    await linker.update({ 
×
666
      executionStatus: executionResult.overallStatus,
667
      lastExecutedAt: new Date()
668
    });
669

670
    // Cache the execution result with 2 weeks expiration (always using indexed strategy)
671
    await cacheLinkerExecution(linker.id, executionResult.mergedData, {
×
672
      executionId,
673
      executionStatus: executionResult.overallStatus,
674
      executionSummary: executionResult.executionSummary,
675
      traceId,
676
      spanId
677
    });
678

679
    logger.info(`Linker ${linker.id} executed and cached successfully`, {
×
680
      linkerId: linker.id,
681
      executionStatus: executionResult.overallStatus
682
    });
683

684
    // User requested merge strategy may differ from cached (indexed) strategy
685
    // Apply the user's requested merge strategy to the detailed results
686
    const userMergedData = mergeStrategy === 'indexed' 
×
687
      ? executionResult.mergedData 
688
      : mergeDatasourceResults(executionResult.detailedResults, mergeStrategy);
689

690
    // Create telemetry context
691
    const telemetryContext = createTelemetryContext({
×
692
      traceId,
693
      spanId,
694
      operationName: `linker.execute.${linker.id}`,
695
      correlationId: executionId
696
    });
697

698
    res.json({
×
699
      message: `Linker executed with status: ${executionResult.overallStatus}`,
700
      linkerId: linker.id,
701
      linkerName: linker.name,
702
      executionStatus: executionResult.overallStatus,
703
      mergeStrategy,
704
      mergedData: userMergedData,
705
      fromCache: false,
706
      executionMetadata: executionResult.executionMetadata,
707
      executionSummary: executionResult.executionSummary,
708
      detailedResults: executionResult.detailedResults,
709
      telemetryContext
710
    });
711

712
  } catch (error) {
713
    logger.error('Error executing linker:', error);
1✔
714
    
715
    // Update linker status to failure
716
    try {
1✔
717
      const linker = await models.Linker.findByPk(req.params.id);
1✔
718
      if (linker) {
1!
719
        await linker.update({ executionStatus: 'failure' });
1✔
720
      }
721
    } catch (updateError) {
722
      logger.error('Error updating linker status after failure:', updateError);
×
723
    }
724

725
    // Create error telemetry context
726
    const telemetryContext = createTelemetryContext({
1✔
727
      traceId,
728
      spanId,
729
      operationName: 'linker.execute.error',
730
      correlationId: `error_${Date.now()}`
731
    });
732

733
    res.status(500).json({ 
1✔
734
      message: 'Error executing linker', 
735
      error: error.message,
736
      linkerId: req.params.id,
737
      telemetryContext
738
    });
739
  }
740
};
741

742
export const getLinkerDatasources = async (req, res) => {
78✔
743
  try {
5✔
744
    const userId = req.user?.user_id;
5✔
745
    const { id } = req.params;
5✔
746

747
    const linker = await models.Linker.findByPk(id);
5✔
748
    if (!checkLinkerOwnership(linker, userId)) {
5✔
749
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
750
    }
751

752
    // Fetch all datasources
753
    const datasources = await models.Datasource.findAll({
3✔
754
      where: {
755
        id: linker.datasourceIds,
756
        ownerId: userId
757
      }
758
    });
759

760
    // Map datasources with their configs from the linker
761
    const datasourcesWithConfigs = datasources.map(ds => {
2✔
762
      const config = linker.datasourceConfigs?.[ds.id];
2✔
763
      return {
2✔
764
        id: ds.id,
765
        name: ds.name,
766
        definitionId: ds.definitionId,
767
        description: ds.description,
768
        environment: ds.environment,
769
        isActive: ds.isActive,
770
        testStatus: ds.testStatus,
771
        linkerConfig: config || null
3✔
772
      };
773
    });
774

775
    res.json({
2✔
776
      linkerId: linker.id,
777
      linkerName: linker.name,
778
      datasourceCount: datasources.length,
779
      datasources: datasourcesWithConfigs
780
    });
781

782
  } catch (error) {
783
    logger.error('Error getting linker datasources:', error);
1✔
784
    res.status(500).json({ 
1✔
785
      message: 'Error getting linker datasources', 
786
      error: error.message 
787
    });
788
  }
789
};
790

791
export { datasourceCatalog };
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc