• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

statuscompliance / status-backend / 19856219812

02 Dec 2025 10:58AM UTC coverage: 91.137% (+2.5%) from 88.591%
19856219812

push

github

alvarobernal2412
refactor: fix bad smells

1450 of 1672 branches covered (86.72%)

Branch coverage included in aggregate %.

87 of 114 new or added lines in 5 files covered. (76.32%)

7 existing lines in 2 files now uncovered.

2735 of 2920 relevant lines covered (93.66%)

27.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.71
/src/controllers/linker.controller.js
1
import { models } from '../models/models.js';
2
import { getDatabinderCatalog } from '../config/databinder.js';
3
import logger from '../config/logger.js';
4
import { 
5
  generateInstanceId, 
6
  extractResultData,
7
  applyPropertyMapping,
8
  generateCorrelationIds,
9
  createTelemetryContext,
10
  normalizeName,
11
  sanitizeLinker,
12
  checkLinkerOwnership,
13
  validateLinkerInput,
14
  validateLinkerUpdateInput,
15
  validateDatasourcesExist,
16
  validateDatasourceConfigs,
17
  generateLinkerExecutionId,
18
  mergeDatasourceResults,
19
  createLinkerExecutionMetadata,
20
  normalizeDatasourceConfigs,
21
  createLinkerExecutionSummary,
22
  cacheLinkerExecution,
23
  getCachedLinkerExecution,
24
  invalidateLinkerCache
25
} from '../utils/databinder/index.js';
26

27
// Get the initialized DatasourceCatalog
28
const datasourceCatalog = getDatabinderCatalog();
78✔
29

30
/**
31
 * Internal function to execute linker and fetch data from all datasources
32
 * This is used both when creating a linker and when executing it via /execute endpoint
33
 * @param {Object} linker - The linker instance
34
 * @param {number} userId - The user ID
35
 * @param {string} executionId - Execution ID for logging
36
 * @param {string} traceId - Trace ID for telemetry
37
 * @param {string} spanId - Span ID for telemetry
38
 * @param {Object} options - Options to pass to datasource methods
39
 * @returns {Promise<Object>} Execution result with status, data, and metadata
40
 */
41
const executeLinkerInternal = async (linker, userId, executionId, traceId, spanId, options = {}) => {
78!
42
  logger.debug(`[${executionId}] Starting internal linker execution`, {
×
43
    linkerId: linker.id,
44
    linkerName: linker.name,
45
    datasourceCount: linker.datasourceIds.length,
46
    userId,
47
    traceId,
48
    spanId
49
  });
50

51
  const executionStartTime = Date.now();
×
52

53
  // Fetch all datasources
54
  const datasources = await models.Datasource.findAll({
×
55
    where: {
56
      id: linker.datasourceIds,
57
      ownerId: userId
58
    }
59
  });
60

61
  if (datasources.length !== linker.datasourceIds.length) {
×
NEW
62
    const foundIds = new Set(datasources.map(ds => ds.id));
×
NEW
63
    const missingIds = linker.datasourceIds.filter(id => !foundIds.has(id));
×
64
    
65
    throw new Error(`Some datasources are no longer available. Missing: ${missingIds.join(', ')}`);
×
66
  }
67

68
  // Execute each datasource
69
  const results = [];
×
70
  
71
  for (const datasource of datasources) {
×
72
    try {
×
73
      const dsConfig = linker.datasourceConfigs?.[datasource.id];
×
74
      const methodName = dsConfig?.methodConfig?.methodName || linker.defaultMethodName;
×
NEW
75
      const methodOptions = { ...options, ...dsConfig?.methodConfig?.options };
×
76
      const propertyMapping = dsConfig?.propertyMapping || null;
×
77

78
      // Create datasource instance
79
      const instanceId = generateInstanceId(datasource.id, Date.now(), 'linker');
×
80
      const instance = datasourceCatalog.createDatasourceInstance(
×
81
        datasource.definitionId,
82
        datasource.config,
83
        instanceId
84
      );
85

86
      // Check if method exists
87
      if (!instance.methods[methodName]) {
×
88
        results.push({
×
89
          datasourceId: datasource.id,
90
          datasourceName: datasource.name,
91
          success: false,
92
          error: `Method '${methodName}' not available`,
93
          data: null
94
        });
95
        continue;
×
96
      }
97

98
      // Execute method
99
      const callStartTime = Date.now();
×
100
      const result = await instance.methods[methodName](methodOptions);
×
101
      const callEndTime = Date.now();
×
102

103
      // Extract and transform data
104
      const extractedResult = extractResultData(result);
×
105
      const finalResult = propertyMapping ? applyPropertyMapping(extractedResult, propertyMapping) : extractedResult;
×
106

107
      results.push({
×
108
        datasourceId: datasource.id,
109
        datasourceName: datasource.name,
110
        definitionId: datasource.definitionId,
111
        methodUsed: methodName,
112
        success: true,
113
        error: null,
114
        data: finalResult,
115
        executionDuration: `${callEndTime - callStartTime}ms`,
116
        propertyMappingApplied: !!propertyMapping
117
      });
118

119
    } catch (error) {
120
      logger.error(`Error executing datasource ${datasource.id} in linker ${linker.id}:`, error);
×
121
      results.push({
×
122
        datasourceId: datasource.id,
123
        datasourceName: datasource.name,
124
        success: false,
125
        error: error.message,
126
        data: null
127
      });
128
    }
129
  }
130

131
  // Determine overall execution status
132
  const allSuccessful = results.every(r => r.success);
×
133
  const anySuccessful = results.some(r => r.success);
×
134
  
135
  let overallStatus;
NEW
136
  if (allSuccessful) {
×
NEW
137
    overallStatus = 'success';
×
NEW
138
  } else if (anySuccessful) {
×
NEW
139
    overallStatus = 'success';
×
140
  } else {
NEW
141
    overallStatus = 'failure';
×
142
  }
143

144
  const executionEndTime = Date.now();
×
145

146
  // Always use 'indexed' merge strategy for caching
147
  const mergedData = mergeDatasourceResults(results, 'indexed');
×
148

149
  // Create execution metadata
150
  const executionMetadata = createLinkerExecutionMetadata({
×
151
    linkerId: linker.id,
152
    datasourceIds: linker.datasourceIds,
153
    executionId,
154
    startTime: executionStartTime,
155
    endTime: executionEndTime,
156
    results
157
  });
158

159
  // Create execution summary
160
  const executionSummary = createLinkerExecutionSummary(results);
×
161

162
  logger.debug(`[${executionId}] Internal linker execution completed`, {
×
163
    executionStatus: overallStatus,
164
    executionDuration: `${executionEndTime - executionStartTime}ms`,
165
    successfulDatasources: executionSummary.successful,
166
    failedDatasources: executionSummary.failed,
167
    traceId,
168
    spanId
169
  });
170

171
  return {
×
172
    overallStatus,
173
    mergedData,
174
    executionMetadata,
175
    executionSummary,
176
    detailedResults: results,
177
    executionStartTime,
178
    executionEndTime
179
  };
180
};
181

182
export const listLinkers = async (req, res) => {
78✔
183
  try {
4✔
184
    const userId = req.user?.user_id;
4✔
185
    if (!userId) return res.status(401).json({ message: 'Unauthorized' });
4✔
186

187
    const linkers = await models.Linker.findAll({
3✔
188
      where: { ownerId: userId },
189
    });
190

191
    const sanitized = linkers.map((linker) => sanitizeLinker(linker));
2✔
192
    res.json(sanitized);
2✔
193
  } catch (error) {
194
    logger.error('Error listing linkers:', error);
1✔
195
    res.status(500).json({ message: 'Error listing linkers', error: error.message });
1✔
196
  }
197
};
198

199
export const getLinker = async (req, res) => {
78✔
200
  try {
4✔
201
    const userId = req.user?.user_id;
4✔
202
    const { id } = req.params;
4✔
203

204
    const linker = await models.Linker.findByPk(id);
4✔
205

206
    if (!checkLinkerOwnership(linker, userId)) {
3✔
207
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
208
    }
209

210
    res.json(sanitizeLinker(linker, true));
1✔
211
  } catch (error) {
212
    logger.error('Error fetching linker:', error);
1✔
213
    res.status(500).json({ message: 'Error fetching linker', error: error.message });
1✔
214
  }
215
};
216

217
export const createLinker = async (req, res) => {
78✔
218
  try {
5✔
219
    const userId = req.user?.user_id;
5✔
220
    if (!userId) return res.status(401).json({ message: 'Unauthorized' });
5✔
221

222
    const { name, defaultMethodName, datasourceIds, datasourceConfigs, description, environment } = req.body;
4✔
223

224
    // Validate input
225
    const validation = validateLinkerInput({ datasourceIds, defaultMethodName, datasourceConfigs });
4✔
226
    if (!validation.isValid) {
4✔
227
      return res.status(400).json({ error: validation.errors.join(', ') });
1✔
228
    }
229

230
    // Validate that all datasource IDs exist and belong to the user
231
    const dsValidation = await validateDatasourcesExist(datasourceIds, models.Datasource, userId);
3✔
232
    if (!dsValidation.isValid) {
3✔
233
      return res.status(400).json({ error: dsValidation.errors.join(', ') });
1✔
234
    }
235

236
    // Validate datasource configs structure if provided
237
    if (datasourceConfigs) {
2✔
238
      const configValidation = validateDatasourceConfigs(datasourceConfigs, datasourceIds);
1✔
239
      if (!configValidation.isValid) {
1!
240
        return res.status(400).json({ error: configValidation.errors.join(', ') });
1✔
241
      }
242
    }
243

244
    // Normalize and check for existing linker with same name
245
    const normalizedName = name ? normalizeName(name) : null;
1!
246
    if (normalizedName) {
1!
247
      const existing = await models.Linker.findOne({
1✔
248
        where: { name: normalizedName, ownerId: userId }
249
      });
250
      if (existing) {
1!
251
        return res.status(409).json({ message: 'A linker with this name already exists.' });
1✔
252
      }
253
    }
254

255
    // Normalize datasource configs
256
    const normalizedConfigs = normalizeDatasourceConfigs(datasourceConfigs, datasourceIds);
×
257

258
    // Create linker
259
    const linkerData = {
×
260
      name: normalizedName,
261
      defaultMethodName: defaultMethodName || 'default',
×
262
      datasourceIds,
263
      datasourceConfigs: Object.keys(normalizedConfigs).length > 0 ? normalizedConfigs : null,
×
264
      description: description || null,
×
265
      environment: environment || 'production',
×
266
      isActive: true,
267
      createdBy: req.user.username,
268
      version: 1,
269
      ownerId: userId,
270
    };
271

272
    const newLinker = await models.Linker.create(linkerData);
×
273

274
    // Execute linker immediately after creation and cache the results
275
    try {
×
276
      const { traceId, spanId } = generateCorrelationIds();
×
277
      const executionId = generateLinkerExecutionId(newLinker.id, Date.now());
×
278
      
279
      logger.info(`Executing linker immediately after creation: ${newLinker.id}`);
×
280
      
281
      const executionResult = await executeLinkerInternal(
×
282
        newLinker,
283
        userId,
284
        executionId,
285
        traceId,
286
        spanId,
287
        {}
288
      );
289

290
      // Update linker execution status
291
      await newLinker.update({
×
292
        executionStatus: executionResult.overallStatus,
293
        lastExecutedAt: new Date()
294
      });
295

296
      // Cache the execution result with 2 weeks expiration
297
      await cacheLinkerExecution(newLinker.id, executionResult.mergedData, {
×
298
        executionId,
299
        executionStatus: executionResult.overallStatus,
300
        executionSummary: executionResult.executionSummary,
301
        traceId,
302
        spanId
303
      });
304

305
      logger.info(`Linker ${newLinker.id} executed and cached successfully`, {
×
306
        linkerId: newLinker.id,
307
        executionStatus: executionResult.overallStatus
308
      });
309

310
    } catch (execError) {
311
      // Log error but don't fail the creation
312
      logger.error(`Error executing linker after creation: ${newLinker.id}`, execError);
×
313
      await newLinker.update({
×
314
        executionStatus: 'failure',
315
        lastExecutedAt: new Date()
316
      });
317
    }
318

319
    const response = sanitizeLinker(newLinker, true);
×
320
    res.status(201).json({ 
×
321
      message: 'Linker created successfully',
322
      datasourceCount: datasourceIds.length,
323
      ...response 
324
    });
325
  } catch (error) {
326
    logger.error('Error creating linker:', error);
×
327
    res.status(500).json({ message: 'Error creating linker', error: error.message });
×
328
  }
329
};
330

331
/**
332
 * Validate and prepare name update
333
 * @param {string} name - New name
334
 * @param {Object} linker - Current linker
335
 * @param {number} userId - User ID
336
 * @param {string} linkerId - Linker ID
337
 * @returns {Promise<Object>} Result with normalized name or error
338
 */
339
const validateNameUpdate = async (name, linker, userId, linkerId) => {
78✔
340
  const normalizedName = normalizeName(name);
2✔
341
  
342
  if (normalizedName === linker.name) {
2!
NEW
343
    return { normalizedName, isValid: true };
×
344
  }
345

346
  const existing = await models.Linker.findOne({
2✔
347
    where: { name: normalizedName, ownerId: userId }
348
  });
349

350
  if (existing && existing.id !== linkerId) {
2✔
351
    return {
1✔
352
      isValid: false,
353
      error: { status: 409, message: 'A linker with this name already exists.' }
354
    };
355
  }
356

357
  return { normalizedName, isValid: true };
1✔
358
};
359

360
/**
361
 * Process datasource IDs update
362
 * @param {Array<string>} datasourceIds - New datasource IDs
363
 * @param {Object} linker - Current linker
364
 * @param {number} userId - User ID
365
 * @returns {Promise<Object>} Update data and cache invalidation flag
366
 */
367
const processDatasourceIdsUpdate = async (datasourceIds, linker, userId) => {
78✔
368
  const dsValidation = await validateDatasourcesExist(datasourceIds, models.Datasource, userId);
3✔
369
  
370
  if (!dsValidation.isValid) {
3✔
371
    return {
1✔
372
      isValid: false,
373
      error: { status: 400, message: dsValidation.errors.join(', ') }
374
    };
375
  }
376

377
  return {
2✔
378
    isValid: true,
379
    updateData: {
380
      datasourceIds,
381
      version: linker.version + 1,
382
      executionStatus: 'not_executed'
383
    },
384
    shouldInvalidateCache: true
385
  };
386
};
387

388
/**
389
 * Process datasource configs update
390
 * @param {Object} datasourceConfigs - New datasource configs
391
 * @param {Array<string>} targetDatasourceIds - Target datasource IDs
392
 * @param {Object} linker - Current linker
393
 * @param {Object} currentUpdateData - Current update data
394
 * @returns {Object} Update data and cache invalidation flag
395
 */
396
const processDatasourceConfigsUpdate = (datasourceConfigs, targetDatasourceIds, linker, currentUpdateData) => {
78✔
NEW
397
  const configValidation = validateDatasourceConfigs(datasourceConfigs, targetDatasourceIds);
×
398
  
NEW
399
  if (!configValidation.isValid) {
×
NEW
400
    return {
×
401
      isValid: false,
402
      error: { status: 400, message: configValidation.errors.join(', ') }
403
    };
404
  }
405

NEW
406
  const updateData = {
×
407
    datasourceConfigs: normalizeDatasourceConfigs(datasourceConfigs, targetDatasourceIds)
408
  };
409

NEW
410
  if (currentUpdateData.version === undefined) {
×
NEW
411
    updateData.version = linker.version + 1;
×
412
  }
413

NEW
414
  return {
×
415
    isValid: true,
416
    updateData,
417
    shouldInvalidateCache: true
418
  };
419
};
420

421
/**
422
 * Build update data from request body
423
 * @param {Object} body - Request body
424
 * @param {Object} linker - Current linker
425
 * @param {number} userId - User ID
426
 * @param {string} linkerId - Linker ID
427
 * @returns {Promise<Object>} Update data and flags
428
 */
429
const buildUpdateData = async (body, linker, userId, linkerId) => {
78✔
430
  const { name, defaultMethodName, datasourceIds, datasourceConfigs, description, environment, isActive } = body;
6✔
431
  const updateData = {};
6✔
432
  let shouldInvalidateCache = false;
6✔
433

434
  // Handle name update
435
  if (name !== undefined) {
6✔
436
    const nameResult = await validateNameUpdate(name, linker, userId, linkerId);
2✔
437
    if (!nameResult.isValid) {
2✔
438
      return nameResult;
1✔
439
    }
440
    updateData.name = nameResult.normalizedName;
1✔
441
  }
442

443
  // Handle defaultMethodName update
444
  if (defaultMethodName !== undefined) {
5!
NEW
445
    updateData.defaultMethodName = defaultMethodName;
×
446
  }
447

448
  // Handle datasourceIds update
449
  if (datasourceIds !== undefined) {
5✔
450
    const dsResult = await processDatasourceIdsUpdate(datasourceIds, linker, userId);
3✔
451
    if (!dsResult.isValid) {
3✔
452
      return dsResult;
1✔
453
    }
454
    Object.assign(updateData, dsResult.updateData);
2✔
455
    shouldInvalidateCache = dsResult.shouldInvalidateCache;
2✔
456
  }
457

458
  // Handle datasourceConfigs update
459
  if (datasourceConfigs !== undefined) {
4!
NEW
460
    const targetDatasourceIds = datasourceIds || linker.datasourceIds;
×
NEW
461
    const configResult = processDatasourceConfigsUpdate(datasourceConfigs, targetDatasourceIds, linker, updateData);
×
NEW
462
    if (!configResult.isValid) {
×
NEW
463
      return configResult;
×
464
    }
NEW
465
    Object.assign(updateData, configResult.updateData);
×
NEW
466
    shouldInvalidateCache = shouldInvalidateCache || configResult.shouldInvalidateCache;
×
467
  }
468

469
  // Handle simple fields
470
  if (description !== undefined) {
4!
NEW
471
    updateData.description = description;
×
472
  }
473

474
  if (environment !== undefined) {
4!
NEW
475
    updateData.environment = environment;
×
476
  }
477

478
  if (isActive !== undefined) {
4!
NEW
479
    updateData.isActive = Boolean(isActive);
×
480
  }
481

482
  return {
4✔
483
    isValid: true,
484
    updateData,
485
    shouldInvalidateCache
486
  };
487
};
488

489
export const updateLinker = async (req, res) => {
78✔
490
  try {
8✔
491
    const userId = req.user?.user_id;
8✔
492
    const { id } = req.params;
8✔
493

494
    const linker = await models.Linker.findByPk(id);
8✔
495
    if (!checkLinkerOwnership(linker, userId)) {
8✔
496
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
497
    }
498

499
    // Validate input
500
    const validation = validateLinkerUpdateInput(req.body);
6✔
501
    if (!validation.isValid) {
6!
NEW
502
      return res.status(400).json({ error: validation.errors.join(', ') });
×
503
    }
504

505
    // Build update data
506
    const result = await buildUpdateData(req.body, linker, userId, id);
6✔
507
    if (!result.isValid) {
6✔
508
      return res.status(result.error.status).json({ message: result.error.message });
2✔
509
    }
510

511
    if (Object.keys(result.updateData).length === 0) {
4✔
512
      return res.status(400).json({ message: 'No valid fields provided for update.' });
1✔
513
    }
514

515
    result.updateData.updatedAt = new Date();
3✔
516

517
    await linker.update(result.updateData);
3✔
518

519
    // Invalidate cache if needed
520
    if (result.shouldInvalidateCache) {
3✔
521
      await invalidateLinkerCache(linker.id);
2✔
522
      logger.info(`Invalidated cache for linker ${linker.id} due to configuration changes`);
2✔
523
    }
524

525
    res.json({ 
3✔
526
      message: 'Linker updated successfully', 
527
      ...sanitizeLinker(linker, true) 
528
    });
529
  } catch (error) {
530
    logger.error('Error updating linker:', error);
×
531
    res.status(500).json({ message: 'Error updating linker', error: error.message });
×
532
  }
533
};
534

535
export const deleteLinker = async (req, res) => {
78✔
536
  try {
4✔
537
    const userId = req.user?.user_id;
4✔
538
    const { id } = req.params;
4✔
539

540
    const linker = await models.Linker.findByPk(id);
4✔
541
    if (!checkLinkerOwnership(linker, userId)) {
3✔
542
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
543
    }
544

545
    // Invalidate cache before deleting
546
    await invalidateLinkerCache(linker.id);
1✔
547
    logger.info(`Invalidated cache for linker ${linker.id} before deletion`);
1✔
548

549
    await linker.destroy();
1✔
550
    res.status(204).send();
1✔
551
  } catch (error) {
552
    logger.error('Error deleting linker:', error);
1✔
553
    res.status(500).json({ message: 'Error deleting linker', error: error.message });
1✔
554
  }
555
};
556

557
export const executeLinker = async (req, res) => {
78✔
558
  // Generate trace-like IDs for correlation
559
  const { traceId, spanId } = generateCorrelationIds();
×
560
  
561
  try {
×
562
    const userId = req.user?.user_id;
×
563
    const { id } = req.params;
×
564
    const { options = {}, mergeStrategy = 'indexed' } = req.body;
×
565

566
    const linker = await models.Linker.findByPk(id);
×
567
    if (!checkLinkerOwnership(linker, userId)) {
×
568
      return res.status(404).json({ message: 'Linker not found or access denied' });
×
569
    }
570

571
    // Capture execution start time
572
    const executionStartTime = Date.now();
×
573
    const executionId = generateLinkerExecutionId(linker.id, executionStartTime);
×
574

575
    logger.debug(`[${executionId}] Starting linker execution`, {
×
576
      linkerId: linker.id,
577
      linkerName: linker.name,
578
      datasourceCount: linker.datasourceIds.length,
579
      userId,
580
      requestId: req.requestId || 'unknown',
×
581
      traceId,
582
      spanId,
583
      mergeStrategy
584
    });
585

586
    // Check cache first
587
    const cachedResult = await getCachedLinkerExecution(linker.id);
×
588
    
589
    // If cache is valid (exists and not stale - less than 1 hour old), use it
590
    if (cachedResult.data && !cachedResult.isStale) {
×
591
      logger.info(`Using cached data for linker ${linker.id}`, {
×
592
        linkerId: linker.id,
593
        cacheAge: cachedResult.cacheAge,
594
        cacheAgeMinutes: Math.floor(cachedResult.cacheAge / 60000)
595
      });
596

597
      // Always use indexed merge strategy for cached data
598
      const cachedMergedData = cachedResult.data;
×
599

600
      // Create telemetry context
601
      const telemetryContext = createTelemetryContext({
×
602
        traceId,
603
        spanId,
604
        operationName: `linker.execute.cached.${linker.id}`,
605
        correlationId: executionId
606
      });
607

608
      return res.json({
×
609
        message: 'Linker executed from cache',
610
        linkerId: linker.id,
611
        linkerName: linker.name,
612
        executionStatus: 'success',
613
        mergeStrategy: 'indexed',
614
        mergedData: cachedMergedData,
615
        fromCache: true,
616
        cacheAge: cachedResult.cacheAge,
617
        cacheAgeMinutes: Math.floor(cachedResult.cacheAge / 60000),
618
        cachedMetadata: cachedResult.metadata,
619
        telemetryContext
620
      });
621
    }
622

623
    // If cache is stale or doesn't exist, delete old cache and execute fresh
624
    if (cachedResult.data && cachedResult.isStale) {
×
625
      logger.info(`Cache is stale for linker ${linker.id}, invalidating and re-executing`, {
×
626
        linkerId: linker.id,
627
        cacheAge: cachedResult.cacheAge,
628
        cacheAgeMinutes: Math.floor(cachedResult.cacheAge / 60000)
629
      });
630
      await invalidateLinkerCache(linker.id);
×
631
    }
632

633
    // Update execution status to pending
634
    await linker.update({ 
×
635
      executionStatus: 'pending',
636
      lastExecutedAt: new Date()
637
    });
638

639
    // Execute linker using internal function
640
    const executionResult = await executeLinkerInternal(
×
641
      linker,
642
      userId,
643
      executionId,
644
      traceId,
645
      spanId,
646
      options
647
    );
648

649
    // Update linker execution status
650
    await linker.update({ 
×
651
      executionStatus: executionResult.overallStatus,
652
      lastExecutedAt: new Date()
653
    });
654

655
    // Cache the execution result with 2 weeks expiration (always using indexed strategy)
656
    await cacheLinkerExecution(linker.id, executionResult.mergedData, {
×
657
      executionId,
658
      executionStatus: executionResult.overallStatus,
659
      executionSummary: executionResult.executionSummary,
660
      traceId,
661
      spanId
662
    });
663

664
    logger.info(`Linker ${linker.id} executed and cached successfully`, {
×
665
      linkerId: linker.id,
666
      executionStatus: executionResult.overallStatus
667
    });
668

669
    // User requested merge strategy may differ from cached (indexed) strategy
670
    // Apply the user's requested merge strategy to the detailed results
671
    const userMergedData = mergeStrategy === 'indexed' 
×
672
      ? executionResult.mergedData 
673
      : mergeDatasourceResults(executionResult.detailedResults, mergeStrategy);
674

675
    // Create telemetry context
676
    const telemetryContext = createTelemetryContext({
×
677
      traceId,
678
      spanId,
679
      operationName: `linker.execute.${linker.id}`,
680
      correlationId: executionId
681
    });
682

683
    res.json({
×
684
      message: `Linker executed with status: ${executionResult.overallStatus}`,
685
      linkerId: linker.id,
686
      linkerName: linker.name,
687
      executionStatus: executionResult.overallStatus,
688
      mergeStrategy,
689
      mergedData: userMergedData,
690
      fromCache: false,
691
      executionMetadata: executionResult.executionMetadata,
692
      executionSummary: executionResult.executionSummary,
693
      detailedResults: executionResult.detailedResults,
694
      telemetryContext
695
    });
696

697
  } catch (error) {
698
    logger.error('Error executing linker:', error);
×
699
    
700
    // Update linker status to failure
701
    try {
×
702
      const linker = await models.Linker.findByPk(req.params.id);
×
703
      if (linker) {
×
704
        await linker.update({ executionStatus: 'failure' });
×
705
      }
706
    } catch (updateError) {
707
      logger.error('Error updating linker status after failure:', updateError);
×
708
    }
709

710
    // Create error telemetry context
711
    const telemetryContext = createTelemetryContext({
×
712
      traceId,
713
      spanId,
714
      operationName: 'linker.execute.error',
715
      correlationId: `error_${Date.now()}`
716
    });
717

718
    res.status(500).json({ 
×
719
      message: 'Error executing linker', 
720
      error: error.message,
721
      linkerId: req.params.id,
722
      telemetryContext
723
    });
724
  }
725
};
726

727
export const getLinkerDatasources = async (req, res) => {
78✔
728
  try {
5✔
729
    const userId = req.user?.user_id;
5✔
730
    const { id } = req.params;
5✔
731

732
    const linker = await models.Linker.findByPk(id);
5✔
733
    if (!checkLinkerOwnership(linker, userId)) {
5✔
734
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
735
    }
736

737
    // Fetch all datasources
738
    const datasources = await models.Datasource.findAll({
3✔
739
      where: {
740
        id: linker.datasourceIds,
741
        ownerId: userId
742
      }
743
    });
744

745
    // Map datasources with their configs from the linker
746
    const datasourcesWithConfigs = datasources.map(ds => {
2✔
747
      const config = linker.datasourceConfigs?.[ds.id];
2✔
748
      return {
2✔
749
        id: ds.id,
750
        name: ds.name,
751
        definitionId: ds.definitionId,
752
        description: ds.description,
753
        environment: ds.environment,
754
        isActive: ds.isActive,
755
        testStatus: ds.testStatus,
756
        linkerConfig: config || null
3✔
757
      };
758
    });
759

760
    res.json({
2✔
761
      linkerId: linker.id,
762
      linkerName: linker.name,
763
      datasourceCount: datasources.length,
764
      datasources: datasourcesWithConfigs
765
    });
766

767
  } catch (error) {
768
    logger.error('Error getting linker datasources:', error);
1✔
769
    res.status(500).json({ 
1✔
770
      message: 'Error getting linker datasources', 
771
      error: error.message 
772
    });
773
  }
774
};
775

776
export { datasourceCatalog };
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc