• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

statuscompliance / status-backend / 20269538425

16 Dec 2025 01:26PM UTC coverage: 90.985% (-0.4%) from 91.375%
20269538425

push

github

alvarobernal2412
chore(linker): make datasourceConfigs required field

1470 of 1698 branches covered (86.57%)

Branch coverage included in aggregate %.

40 of 51 new or added lines in 4 files covered. (78.43%)

1 existing line in 1 file now uncovered.

2769 of 2961 relevant lines covered (93.52%)

28.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.14
/src/controllers/linker.controller.js
1
import { models } from '../models/models.js';
2
import { getDatabinderCatalog } from '../config/databinder.js';
3
import logger from '../config/logger.js';
4
import { 
5
  generateInstanceId, 
6
  extractResultData,
7
  applyPropertyMapping,
8
  generateCorrelationIds,
9
  createTelemetryContext,
10
  normalizeName,
11
  sanitizeLinker,
12
  checkLinkerOwnership,
13
  validateLinkerInput,
14
  validateLinkerUpdateInput,
15
  validateDatasourcesExist,
16
  validateDatasourceConfigs,
17
  generateLinkerExecutionId,
18
  mergeDatasourceResults,
19
  createLinkerExecutionMetadata,
20
  normalizeDatasourceConfigs,
21
  createLinkerExecutionSummary,
22
  cacheLinkerExecution,
23
  getCachedLinkerExecution,
24
  invalidateLinkerCache
25
} from '../utils/databinder/index.js';
26

27
// Get the initialized DatasourceCatalog
28
const datasourceCatalog = getDatabinderCatalog();
78✔
29

30
/**
31
 * Internal function to execute linker and fetch data from all datasources
32
 * This is used both when creating a linker and when executing it via /execute endpoint
33
 * @param {Object} linker - The linker instance
34
 * @param {number} userId - The user ID
35
 * @param {string} executionId - Execution ID for logging
36
 * @param {string} traceId - Trace ID for telemetry
37
 * @param {string} spanId - Span ID for telemetry
38
 * @param {Object} options - Options to pass to datasource methods
39
 * @returns {Promise<Object>} Execution result with status, data, and metadata
40
 */
41
const executeLinkerInternal = async (linker, userId, executionId, traceId, spanId, options = {}) => {
78!
42
  logger.debug(`[${executionId}] Starting internal linker execution`, {
×
43
    linkerId: linker.id,
44
    linkerName: linker.name,
45
    datasourceCount: linker.datasourceIds.length,
46
    userId,
47
    traceId,
48
    spanId
49
  });
50

51
  const executionStartTime = Date.now();
×
52

53
  // Fetch all datasources
54
  const datasources = await models.Datasource.findAll({
×
55
    where: {
56
      id: linker.datasourceIds,
57
      ownerId: userId
58
    }
59
  });
60

61
  if (datasources.length !== linker.datasourceIds.length) {
×
62
    const foundIds = new Set(datasources.map(ds => ds.id));
×
63
    const missingIds = linker.datasourceIds.filter(id => !foundIds.has(id));
×
64
    
65
    throw new Error(`Some datasources are no longer available. Missing: ${missingIds.join(', ')}`);
×
66
  }
67

68
  // Execute each datasource
69
  const results = [];
×
70
  
71
  for (const datasource of datasources) {
×
72
    try {
×
73
      const dsConfig = linker.datasourceConfigs?.[datasource.id];
×
74
      const methodName = dsConfig?.methodConfig?.methodName || linker.defaultMethodName;
×
75
      const methodOptions = { ...options, ...dsConfig?.methodConfig?.options };
×
76
      const propertyMapping = dsConfig?.propertyMapping || null;
×
77

78
      // Create datasource instance
79
      const instanceId = generateInstanceId(datasource.id, Date.now(), 'linker');
×
80
      const instance = datasourceCatalog.createDatasourceInstance(
×
81
        datasource.definitionId,
82
        datasource.config,
83
        instanceId
84
      );
85

86
      // Check if method exists
87
      if (!instance.methods[methodName]) {
×
88
        results.push({
×
89
          datasourceId: datasource.id,
90
          datasourceName: datasource.name,
91
          success: false,
92
          error: `Method '${methodName}' not available`,
93
          data: null
94
        });
95
        continue;
×
96
      }
97

98
      // Execute method
99
      const callStartTime = Date.now();
×
100
      const result = await instance.methods[methodName](methodOptions);
×
101
      const callEndTime = Date.now();
×
102

103
      // Extract and transform data
104
      const extractedResult = extractResultData(result);
×
105
      const finalResult = propertyMapping ? applyPropertyMapping(extractedResult, propertyMapping) : extractedResult;
×
106

107
      results.push({
×
108
        datasourceId: datasource.id,
109
        datasourceName: datasource.name,
110
        definitionId: datasource.definitionId,
111
        methodUsed: methodName,
112
        success: true,
113
        error: null,
114
        data: finalResult,
115
        executionDuration: `${callEndTime - callStartTime}ms`,
116
        propertyMappingApplied: !!propertyMapping
117
      });
118

119
    } catch (error) {
120
      logger.error(`Error executing datasource ${datasource.id} in linker ${linker.id}:`, error);
×
121
      results.push({
×
122
        datasourceId: datasource.id,
123
        datasourceName: datasource.name,
124
        success: false,
125
        error: error.message,
126
        data: null
127
      });
128
    }
129
  }
130

131
  // Determine overall execution status
132
  const allSuccessful = results.every(r => r.success);
×
133
  const anySuccessful = results.some(r => r.success);
×
134
  
135
  let overallStatus;
136
  if (allSuccessful) {
×
137
    overallStatus = 'success';
×
138
  } else if (anySuccessful) {
×
139
    overallStatus = 'success';
×
140
  } else {
141
    overallStatus = 'failure';
×
142
  }
143

144
  const executionEndTime = Date.now();
×
145

146
  // Always use 'indexed' merge strategy for caching
147
  const mergedData = mergeDatasourceResults(results, 'indexed');
×
148

149
  // Create execution metadata
150
  const executionMetadata = createLinkerExecutionMetadata({
×
151
    linkerId: linker.id,
152
    datasourceIds: linker.datasourceIds,
153
    executionId,
154
    startTime: executionStartTime,
155
    endTime: executionEndTime,
156
    results
157
  });
158

159
  // Create execution summary
160
  const executionSummary = createLinkerExecutionSummary(results);
×
161

162
  logger.debug(`[${executionId}] Internal linker execution completed`, {
×
163
    executionStatus: overallStatus,
164
    executionDuration: `${executionEndTime - executionStartTime}ms`,
165
    successfulDatasources: executionSummary.successful,
166
    failedDatasources: executionSummary.failed,
167
    traceId,
168
    spanId
169
  });
170

171
  return {
×
172
    overallStatus,
173
    mergedData,
174
    executionMetadata,
175
    executionSummary,
176
    detailedResults: results,
177
    executionStartTime,
178
    executionEndTime
179
  };
180
};
181

182
export const listLinkers = async (req, res) => {
78✔
183
  try {
4✔
184
    const userId = req.user?.user_id;
4✔
185
    if (!userId) return res.status(401).json({ message: 'Unauthorized' });
4✔
186

187
    const linkers = await models.Linker.findAll({
3✔
188
      where: { ownerId: userId },
189
    });
190

191
    const sanitized = linkers.map((linker) => sanitizeLinker(linker));
2✔
192
    res.json(sanitized);
2✔
193
  } catch (error) {
194
    logger.error('Error listing linkers:', error);
1✔
195
    res.status(500).json({ message: 'Error listing linkers', error: error.message });
1✔
196
  }
197
};
198

199
export const getLinker = async (req, res) => {
78✔
200
  try {
4✔
201
    const userId = req.user?.user_id;
4✔
202
    const { id } = req.params;
4✔
203

204
    const linker = await models.Linker.findByPk(id);
4✔
205

206
    if (!checkLinkerOwnership(linker, userId)) {
3✔
207
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
208
    }
209

210
    res.json(sanitizeLinker(linker, true));
1✔
211
  } catch (error) {
212
    logger.error('Error fetching linker:', error);
1✔
213
    res.status(500).json({ message: 'Error fetching linker', error: error.message });
1✔
214
  }
215
};
216

217
export const createLinker = async (req, res) => {
78✔
218
  try {
5✔
219
    const userId = req.user?.user_id;
5✔
220
    if (!userId) return res.status(401).json({ message: 'Unauthorized' });
5✔
221

222
    const { name, defaultMethodName, datasourceIds, datasourceConfigs, description, environment } = req.body;
4✔
223

224
    // Validate input
225
    const validation = validateLinkerInput({ datasourceIds, defaultMethodName, datasourceConfigs });
4✔
226
    if (!validation.isValid) {
4✔
227
      return res.status(400).json({ error: validation.errors.join(', ') });
3✔
228
    }
229

230
    // Validate that all datasource IDs exist and belong to the user
231
    const dsValidation = await validateDatasourcesExist(datasourceIds, models.Datasource, userId);
1✔
232
    if (!dsValidation.isValid) {
1!
UNCOV
233
      return res.status(400).json({ error: dsValidation.errors.join(', ') });
×
234
    }
235

236
    // Validate datasource configs structure (now mandatory)
237
    const configValidation = validateDatasourceConfigs(datasourceConfigs, datasourceIds);
1✔
238
    if (!configValidation.isValid) {
1!
NEW
239
      return res.status(400).json({ error: configValidation.errors.join(', ') });
×
240
    }
241

242
    // Normalize and check for existing linker with same name
243
    const normalizedName = name ? normalizeName(name) : null;
1!
244
    if (normalizedName) {
1!
245
      const existing = await models.Linker.findOne({
1✔
246
        where: { name: normalizedName, ownerId: userId }
247
      });
248
      if (existing) {
1!
249
        return res.status(409).json({ message: 'A linker with this name already exists.' });
1✔
250
      }
251
    }
252

253
    // Normalize datasource configs
254
    const normalizedConfigs = normalizeDatasourceConfigs(datasourceConfigs, datasourceIds);
×
255

256
    // Create linker
257
    const linkerData = {
×
258
      name: normalizedName,
259
      defaultMethodName: defaultMethodName || 'default',
×
260
      datasourceIds,
261
      datasourceConfigs: normalizedConfigs, // Always required, never null
262
      description: description || null,
×
263
      environment: environment || 'production',
×
264
      isActive: true,
265
      createdBy: req.user.username,
266
      version: 1,
267
      ownerId: userId,
268
    };
269

270
    const newLinker = await models.Linker.create(linkerData);
×
271

272
    // Execute linker immediately after creation and cache the results
273
    try {
×
274
      const { traceId, spanId } = generateCorrelationIds();
×
275
      const executionId = generateLinkerExecutionId(newLinker.id, Date.now());
×
276
      
277
      logger.info(`Executing linker immediately after creation: ${newLinker.id}`);
×
278
      
279
      const executionResult = await executeLinkerInternal(
×
280
        newLinker,
281
        userId,
282
        executionId,
283
        traceId,
284
        spanId,
285
        {}
286
      );
287

288
      // Update linker execution status
289
      await newLinker.update({
×
290
        executionStatus: executionResult.overallStatus,
291
        lastExecutedAt: new Date()
292
      });
293

294
      // Cache the execution result with 2 weeks expiration
295
      await cacheLinkerExecution(newLinker.id, executionResult.mergedData, {
×
296
        executionId,
297
        executionStatus: executionResult.overallStatus,
298
        executionSummary: executionResult.executionSummary,
299
        traceId,
300
        spanId
301
      });
302

303
      logger.info(`Linker ${newLinker.id} executed and cached successfully`, {
×
304
        linkerId: newLinker.id,
305
        executionStatus: executionResult.overallStatus
306
      });
307

308
    } catch (execError) {
309
      // Log error but don't fail the creation
310
      logger.error(`Error executing linker after creation: ${newLinker.id}`, execError);
×
311
      await newLinker.update({
×
312
        executionStatus: 'failure',
313
        lastExecutedAt: new Date()
314
      });
315
    }
316

317
    const response = sanitizeLinker(newLinker, true);
×
318
    res.status(201).json({ 
×
319
      message: 'Linker created successfully',
320
      datasourceCount: datasourceIds.length,
321
      ...response 
322
    });
323
  } catch (error) {
324
    logger.error('Error creating linker:', error);
×
325
    res.status(500).json({ message: 'Error creating linker', error: error.message });
×
326
  }
327
};
328

329
/**
330
 * Validate and prepare name update
331
 * @param {string} name - New name
332
 * @param {Object} linker - Current linker
333
 * @param {number} userId - User ID
334
 * @param {string} linkerId - Linker ID
335
 * @returns {Promise<Object>} Result with normalized name or error
336
 */
337
const validateNameUpdate = async (name, linker, userId, linkerId) => {
78✔
338
  const normalizedName = normalizeName(name);
2✔
339
  
340
  if (normalizedName === linker.name) {
2!
341
    return { normalizedName, isValid: true };
×
342
  }
343

344
  const existing = await models.Linker.findOne({
2✔
345
    where: { name: normalizedName, ownerId: userId }
346
  });
347

348
  if (existing && existing.id !== linkerId) {
2✔
349
    return {
1✔
350
      isValid: false,
351
      error: { status: 409, message: 'A linker with this name already exists.' }
352
    };
353
  }
354

355
  return { normalizedName, isValid: true };
1✔
356
};
357

358
/**
359
 * Process datasource IDs update
360
 * @param {Array<string>} datasourceIds - New datasource IDs
361
 * @param {Object} linker - Current linker
362
 * @param {number} userId - User ID
363
 * @returns {Promise<Object>} Update data and cache invalidation flag
364
 */
365
const processDatasourceIdsUpdate = async (datasourceIds, linker, userId) => {
78✔
366
  const dsValidation = await validateDatasourcesExist(datasourceIds, models.Datasource, userId);
3✔
367
  
368
  if (!dsValidation.isValid) {
3✔
369
    return {
1✔
370
      isValid: false,
371
      error: { status: 400, message: dsValidation.errors.join(', ') }
372
    };
373
  }
374

375
  return {
2✔
376
    isValid: true,
377
    updateData: {
378
      datasourceIds,
379
      version: linker.version + 1,
380
      executionStatus: 'not_executed'
381
    },
382
    shouldInvalidateCache: true
383
  };
384
};
385

386
/**
387
 * Process datasource configs update
388
 * @param {Object} datasourceConfigs - New datasource configs
389
 * @param {Array<string>} targetDatasourceIds - Target datasource IDs
390
 * @param {Object} linker - Current linker
391
 * @param {Object} currentUpdateData - Current update data
392
 * @returns {Object} Update data and cache invalidation flag
393
 */
394
const processDatasourceConfigsUpdate = (datasourceConfigs, targetDatasourceIds, linker, currentUpdateData) => {
78✔
395
  const configValidation = validateDatasourceConfigs(datasourceConfigs, targetDatasourceIds);
×
396
  
397
  if (!configValidation.isValid) {
×
398
    return {
×
399
      isValid: false,
400
      error: { status: 400, message: configValidation.errors.join(', ') }
401
    };
402
  }
403

404
  const updateData = {
×
405
    datasourceConfigs: normalizeDatasourceConfigs(datasourceConfigs, targetDatasourceIds)
406
  };
407

408
  if (currentUpdateData.version === undefined) {
×
409
    updateData.version = linker.version + 1;
×
410
  }
411

412
  return {
×
413
    isValid: true,
414
    updateData,
415
    shouldInvalidateCache: true
416
  };
417
};
418

419
/**
420
 * Build update data from request body
421
 * @param {Object} body - Request body
422
 * @param {Object} linker - Current linker
423
 * @param {number} userId - User ID
424
 * @param {string} linkerId - Linker ID
425
 * @returns {Promise<Object>} Update data and flags
426
 */
427
const buildUpdateData = async (body, linker, userId, linkerId) => {
78✔
428
  const { name, defaultMethodName, datasourceIds, datasourceConfigs, description, environment, isActive } = body;
6✔
429
  const updateData = {};
6✔
430
  let shouldInvalidateCache = false;
6✔
431

432
  // Handle name update
433
  if (name !== undefined) {
6✔
434
    const nameResult = await validateNameUpdate(name, linker, userId, linkerId);
2✔
435
    if (!nameResult.isValid) {
2✔
436
      return nameResult;
1✔
437
    }
438
    updateData.name = nameResult.normalizedName;
1✔
439
  }
440

441
  // Handle defaultMethodName update
442
  if (defaultMethodName !== undefined) {
5!
443
    updateData.defaultMethodName = defaultMethodName;
×
444
  }
445

446
  // Handle datasourceIds update
447
  if (datasourceIds !== undefined) {
5✔
448
    const dsResult = await processDatasourceIdsUpdate(datasourceIds, linker, userId);
3✔
449
    if (!dsResult.isValid) {
3✔
450
      return dsResult;
1✔
451
    }
452
    Object.assign(updateData, dsResult.updateData);
2✔
453
    shouldInvalidateCache = dsResult.shouldInvalidateCache;
2✔
454
  }
455

456
  // Handle datasourceConfigs update
457
  if (datasourceConfigs !== undefined) {
4!
NEW
458
    const targetDatasourceIds = datasourceIds || linker.datasourceIds;    
×
459
    // datasourceConfigs cannot be null or undefined
NEW
460
    if (datasourceConfigs === null) {
×
NEW
461
      return {
×
462
        isValid: false,
463
        error: 'datasourceConfigs is required and cannot be null'
464
      };
465
    }
466
    const configResult = processDatasourceConfigsUpdate(datasourceConfigs, targetDatasourceIds, linker, updateData);
×
467
    if (!configResult.isValid) {
×
468
      return configResult;
×
469
    }
470
    Object.assign(updateData, configResult.updateData);
×
471
    shouldInvalidateCache = shouldInvalidateCache || configResult.shouldInvalidateCache;
×
472
  }
473

474
  // Handle simple fields
475
  if (description !== undefined) {
4!
476
    updateData.description = description;
×
477
  }
478

479
  if (environment !== undefined) {
4!
480
    updateData.environment = environment;
×
481
  }
482

483
  if (isActive !== undefined) {
4!
484
    updateData.isActive = Boolean(isActive);
×
485
  }
486

487
  return {
4✔
488
    isValid: true,
489
    updateData,
490
    shouldInvalidateCache
491
  };
492
};
493

494
export const updateLinker = async (req, res) => {
78✔
495
  try {
8✔
496
    const userId = req.user?.user_id;
8✔
497
    const { id } = req.params;
8✔
498

499
    const linker = await models.Linker.findByPk(id);
8✔
500
    if (!checkLinkerOwnership(linker, userId)) {
8✔
501
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
502
    }
503

504
    // Validate input
505
    const validation = validateLinkerUpdateInput(req.body);
6✔
506
    if (!validation.isValid) {
6!
507
      return res.status(400).json({ error: validation.errors.join(', ') });
×
508
    }
509

510
    // Build update data
511
    const result = await buildUpdateData(req.body, linker, userId, id);
6✔
512
    if (!result.isValid) {
6✔
513
      return res.status(result.error.status).json({ message: result.error.message });
2✔
514
    }
515

516
    if (Object.keys(result.updateData).length === 0) {
4✔
517
      return res.status(400).json({ message: 'No valid fields provided for update.' });
1✔
518
    }
519

520
    result.updateData.updatedAt = new Date();
3✔
521

522
    await linker.update(result.updateData);
3✔
523

524
    // Invalidate cache if needed
525
    if (result.shouldInvalidateCache) {
3✔
526
      await invalidateLinkerCache(linker.id);
2✔
527
      logger.info(`Invalidated cache for linker ${linker.id} due to configuration changes`);
2✔
528
    }
529

530
    res.json({ 
3✔
531
      message: 'Linker updated successfully', 
532
      ...sanitizeLinker(linker, true) 
533
    });
534
  } catch (error) {
535
    logger.error('Error updating linker:', error);
×
536
    res.status(500).json({ message: 'Error updating linker', error: error.message });
×
537
  }
538
};
539

540
export const deleteLinker = async (req, res) => {
78✔
541
  try {
4✔
542
    const userId = req.user?.user_id;
4✔
543
    const { id } = req.params;
4✔
544

545
    const linker = await models.Linker.findByPk(id);
4✔
546
    if (!checkLinkerOwnership(linker, userId)) {
3✔
547
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
548
    }
549

550
    // Invalidate cache before deleting
551
    await invalidateLinkerCache(linker.id);
1✔
552
    logger.info(`Invalidated cache for linker ${linker.id} before deletion`);
1✔
553

554
    await linker.destroy();
1✔
555
    res.status(204).send();
1✔
556
  } catch (error) {
557
    logger.error('Error deleting linker:', error);
1✔
558
    res.status(500).json({ message: 'Error deleting linker', error: error.message });
1✔
559
  }
560
};
561

562
export const executeLinker = async (req, res) => {
78✔
563
  // Generate trace-like IDs for correlation
564
  const { traceId, spanId } = generateCorrelationIds();
×
565
  
566
  try {
×
567
    const userId = req.user?.user_id;
×
568
    const { id } = req.params;
×
569
    const { options = {}, mergeStrategy = 'indexed' } = req.body;
×
570

571
    const linker = await models.Linker.findByPk(id);
×
572
    if (!checkLinkerOwnership(linker, userId)) {
×
573
      return res.status(404).json({ message: 'Linker not found or access denied' });
×
574
    }
575

576
    // Capture execution start time
577
    const executionStartTime = Date.now();
×
578
    const executionId = generateLinkerExecutionId(linker.id, executionStartTime);
×
579

580
    logger.debug(`[${executionId}] Starting linker execution`, {
×
581
      linkerId: linker.id,
582
      linkerName: linker.name,
583
      datasourceCount: linker.datasourceIds.length,
584
      userId,
585
      requestId: req.requestId || 'unknown',
×
586
      traceId,
587
      spanId,
588
      mergeStrategy
589
    });
590

591
    // Check cache first
592
    const cachedResult = await getCachedLinkerExecution(linker.id);
×
593
    
594
    // If cache is valid (exists and not stale - less than 1 hour old), use it
595
    if (cachedResult.data && !cachedResult.isStale) {
×
596
      logger.info(`Using cached data for linker ${linker.id}`, {
×
597
        linkerId: linker.id,
598
        cacheAge: cachedResult.cacheAge,
599
        cacheAgeMinutes: Math.floor(cachedResult.cacheAge / 60000)
600
      });
601

602
      // Always use indexed merge strategy for cached data
603
      const cachedMergedData = cachedResult.data;
×
604

605
      // Create telemetry context
606
      const telemetryContext = createTelemetryContext({
×
607
        traceId,
608
        spanId,
609
        operationName: `linker.execute.cached.${linker.id}`,
610
        correlationId: executionId
611
      });
612

613
      return res.json({
×
614
        message: 'Linker executed from cache',
615
        linkerId: linker.id,
616
        linkerName: linker.name,
617
        executionStatus: 'success',
618
        mergeStrategy: 'indexed',
619
        mergedData: cachedMergedData,
620
        fromCache: true,
621
        cacheAge: cachedResult.cacheAge,
622
        cacheAgeMinutes: Math.floor(cachedResult.cacheAge / 60000),
623
        cachedMetadata: cachedResult.metadata,
624
        telemetryContext
625
      });
626
    }
627

628
    // If cache is stale or doesn't exist, delete old cache and execute fresh
629
    if (cachedResult.data && cachedResult.isStale) {
×
630
      logger.info(`Cache is stale for linker ${linker.id}, invalidating and re-executing`, {
×
631
        linkerId: linker.id,
632
        cacheAge: cachedResult.cacheAge,
633
        cacheAgeMinutes: Math.floor(cachedResult.cacheAge / 60000)
634
      });
635
      await invalidateLinkerCache(linker.id);
×
636
    }
637

638
    // Update execution status to pending
639
    await linker.update({ 
×
640
      executionStatus: 'pending',
641
      lastExecutedAt: new Date()
642
    });
643

644
    // Execute linker using internal function
645
    const executionResult = await executeLinkerInternal(
×
646
      linker,
647
      userId,
648
      executionId,
649
      traceId,
650
      spanId,
651
      options
652
    );
653

654
    // Update linker execution status
655
    await linker.update({ 
×
656
      executionStatus: executionResult.overallStatus,
657
      lastExecutedAt: new Date()
658
    });
659

660
    // Cache the execution result with 2 weeks expiration (always using indexed strategy)
661
    await cacheLinkerExecution(linker.id, executionResult.mergedData, {
×
662
      executionId,
663
      executionStatus: executionResult.overallStatus,
664
      executionSummary: executionResult.executionSummary,
665
      traceId,
666
      spanId
667
    });
668

669
    logger.info(`Linker ${linker.id} executed and cached successfully`, {
×
670
      linkerId: linker.id,
671
      executionStatus: executionResult.overallStatus
672
    });
673

674
    // User requested merge strategy may differ from cached (indexed) strategy
675
    // Apply the user's requested merge strategy to the detailed results
676
    const userMergedData = mergeStrategy === 'indexed' 
×
677
      ? executionResult.mergedData 
678
      : mergeDatasourceResults(executionResult.detailedResults, mergeStrategy);
679

680
    // Create telemetry context
681
    const telemetryContext = createTelemetryContext({
×
682
      traceId,
683
      spanId,
684
      operationName: `linker.execute.${linker.id}`,
685
      correlationId: executionId
686
    });
687

688
    res.json({
×
689
      message: `Linker executed with status: ${executionResult.overallStatus}`,
690
      linkerId: linker.id,
691
      linkerName: linker.name,
692
      executionStatus: executionResult.overallStatus,
693
      mergeStrategy,
694
      mergedData: userMergedData,
695
      fromCache: false,
696
      executionMetadata: executionResult.executionMetadata,
697
      executionSummary: executionResult.executionSummary,
698
      detailedResults: executionResult.detailedResults,
699
      telemetryContext
700
    });
701

702
  } catch (error) {
703
    logger.error('Error executing linker:', error);
×
704
    
705
    // Update linker status to failure
706
    try {
×
707
      const linker = await models.Linker.findByPk(req.params.id);
×
708
      if (linker) {
×
709
        await linker.update({ executionStatus: 'failure' });
×
710
      }
711
    } catch (updateError) {
712
      logger.error('Error updating linker status after failure:', updateError);
×
713
    }
714

715
    // Create error telemetry context
716
    const telemetryContext = createTelemetryContext({
×
717
      traceId,
718
      spanId,
719
      operationName: 'linker.execute.error',
720
      correlationId: `error_${Date.now()}`
721
    });
722

723
    res.status(500).json({ 
×
724
      message: 'Error executing linker', 
725
      error: error.message,
726
      linkerId: req.params.id,
727
      telemetryContext
728
    });
729
  }
730
};
731

732
export const getLinkerDatasources = async (req, res) => {
78✔
733
  try {
5✔
734
    const userId = req.user?.user_id;
5✔
735
    const { id } = req.params;
5✔
736

737
    const linker = await models.Linker.findByPk(id);
5✔
738
    if (!checkLinkerOwnership(linker, userId)) {
5✔
739
      return res.status(404).json({ message: 'Linker not found or access denied' });
2✔
740
    }
741

742
    // Fetch all datasources
743
    const datasources = await models.Datasource.findAll({
3✔
744
      where: {
745
        id: linker.datasourceIds,
746
        ownerId: userId
747
      }
748
    });
749

750
    // Map datasources with their configs from the linker
751
    const datasourcesWithConfigs = datasources.map(ds => {
2✔
752
      const config = linker.datasourceConfigs?.[ds.id];
2✔
753
      return {
2✔
754
        id: ds.id,
755
        name: ds.name,
756
        definitionId: ds.definitionId,
757
        description: ds.description,
758
        environment: ds.environment,
759
        isActive: ds.isActive,
760
        testStatus: ds.testStatus,
761
        linkerConfig: config || null
3✔
762
      };
763
    });
764

765
    res.json({
2✔
766
      linkerId: linker.id,
767
      linkerName: linker.name,
768
      datasourceCount: datasources.length,
769
      datasources: datasourcesWithConfigs
770
    });
771

772
  } catch (error) {
773
    logger.error('Error getting linker datasources:', error);
1✔
774
    res.status(500).json({ 
1✔
775
      message: 'Error getting linker datasources', 
776
      error: error.message 
777
    });
778
  }
779
};
780

781
export { datasourceCatalog };
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc