• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 18709238527

22 Oct 2025 07:50AM UTC coverage: 85.968%. Remained the same
18709238527

Pull #2560

github

tolgaozen
docs: update API version to v1.4.6 across documentation and code files
Pull Request #2560: docs: update API version to v1.4.6 across documentation and code files

9423 of 10961 relevant lines covered (85.97%)

298.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.83
/internal/engines/bulk.go
1
package engines
2

3
import (
4
        "context"
5
        "fmt"
6
        "sort"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/pkg/errors"
11

12
        "golang.org/x/sync/errgroup"
13
        "golang.org/x/sync/semaphore"
14

15
        "github.com/Permify/permify/internal/invoke"
16
        "github.com/Permify/permify/internal/storage/memory/utils"
17
        base "github.com/Permify/permify/pkg/pb/base/v1"
18
)
19

20
// BulkCheckerType defines the type of bulk checking operation.
21
// This enum determines how requests are sorted and processed.
22
type BulkCheckerType string
23

24
const (
25
        // BulkCheckerTypeSubject indicates that requests should be sorted and processed by subject ID
26
        BulkCheckerTypeSubject BulkCheckerType = "subject"
27
        // BulkCheckerTypeEntity indicates that requests should be sorted and processed by entity ID
28
        BulkCheckerTypeEntity BulkCheckerType = "entity"
29
)
30

31
// BulkCheckerRequest represents a permission check request with optional pre-computed result.
32
// This struct encapsulates both the permission check request and an optional pre-determined result,
33
// allowing for optimization when results are already known (e.g., from caching).
34
type BulkCheckerRequest struct {
35
        // Request contains the actual permission check request
36
        Request *base.PermissionCheckRequest
37
        // Result holds a pre-computed result if available, otherwise CHECK_RESULT_UNSPECIFIED
38
        Result base.CheckResult
39
}
40

41
// BulkCheckerConfig holds configuration parameters for the BulkChecker.
42
// This struct allows for fine-tuning the behavior and performance characteristics
43
// of the bulk permission checking system.
44
type BulkCheckerConfig struct {
45
        // ConcurrencyLimit defines the maximum number of concurrent permission checks
46
        // that can be processed simultaneously. Higher values increase throughput
47
        // but may consume more system resources.
48
        ConcurrencyLimit int
49
        // BufferSize defines the size of the internal request buffer.
50
        // This should be set based on expected request volume to avoid blocking.
51
        BufferSize int
52
}
53

54
// DefaultBulkCheckerConfig returns a sensible default configuration
55
// that balances performance and resource usage for most use cases.
56
func DefaultBulkCheckerConfig() BulkCheckerConfig {
3✔
57
        return BulkCheckerConfig{
3✔
58
                ConcurrencyLimit: 10,   // Moderate concurrency for most workloads
3✔
59
                BufferSize:       1000, // Buffer for 1000 requests
3✔
60
        }
3✔
61
}
3✔
62

63
// BulkChecker handles concurrent permission checks with ordered result processing.
64
// This struct implements a high-performance bulk permission checking system that:
65
// - Collects permission check requests asynchronously
66
// - Processes them concurrently with controlled parallelism
67
// - Maintains strict ordering of results based on request sorting
68
// - Provides efficient resource management and error handling
69
type BulkChecker struct {
70
        // typ determines the sorting strategy and processing behavior
71
        typ BulkCheckerType
72
        // checker is the underlying permission checking engine
73
        checker invoke.Check
74
        // config holds the operational configuration
75
        config BulkCheckerConfig
76
        // ctx provides context for cancellation and timeout management
77
        ctx context.Context
78
        // cancel allows for graceful shutdown of all operations
79
        cancel context.CancelFunc
80

81
        // Request handling components
82
        // requestChan is the input channel for receiving permission check requests
83
        requestChan chan BulkCheckerRequest
84
        // requests stores all collected requests before processing
85
        requests []BulkCheckerRequest
86
        // requestsMu provides thread-safe access to the requests slice
87
        requestsMu sync.RWMutex
88

89
        // Execution state management
90
        // executionState tracks the progress and results of request processing
91
        executionState *executionState
92
        // collectionDone signals when request collection has completed
93
        collectionDone chan struct{}
94

95
        // Callback for processing results
96
        // callback is invoked for each successful permission check with the entity/subject ID and continuous token
97
        callback func(entityID, continuousToken string)
98
}
99

100
// executionState manages the execution of requests and maintains processing order.
101
// This struct ensures that results are processed in the correct sequence
102
// even when requests complete out of order due to concurrent processing.
103
type executionState struct {
104
        // mu protects access to the execution state
105
        mu sync.Mutex
106
        // results stores the results of all requests in their original order
107
        results []base.CheckResult
108
        // processedIndex tracks the next result to be processed in order
109
        processedIndex int
110
        // successCount tracks the number of successful permission checks
111
        successCount int64
112
        // limit defines the maximum number of successful results to process
113
        limit int64
114
}
115

116
// NewBulkChecker creates a new BulkChecker instance with comprehensive validation and error handling.
117
// This constructor ensures that all dependencies are properly initialized and validates
118
// configuration parameters to prevent runtime errors.
119
//
120
// Parameters:
121
//   - ctx: Context for managing the lifecycle of the BulkChecker
122
//   - checker: The permission checking engine to use for actual permission checks
123
//   - typ: The type of bulk checking operation (entity or subject)
124
//   - callback: Function called for each successful permission check
125
//   - config: Configuration parameters for tuning performance and behavior
126
//
127
// Returns:
128
//   - *BulkChecker: The initialized BulkChecker instance
129
//   - error: Any error that occurred during initialization
130
func NewBulkChecker(ctx context.Context, checker invoke.Check, typ BulkCheckerType, callback func(entityID, ct string), config BulkCheckerConfig) (*BulkChecker, error) {
156✔
131
        // Validate all required parameters
156✔
132
        if ctx == nil {
157✔
133
                return nil, fmt.Errorf("context cannot be nil")
1✔
134
        }
1✔
135
        if checker == nil {
157✔
136
                return nil, fmt.Errorf("checker cannot be nil")
2✔
137
        }
2✔
138
        if callback == nil {
154✔
139
                return nil, fmt.Errorf("callback cannot be nil")
1✔
140
        }
1✔
141

142
        // Apply default values for invalid configuration
143
        if config.ConcurrencyLimit <= 0 {
153✔
144
                config.ConcurrencyLimit = DefaultBulkCheckerConfig().ConcurrencyLimit
1✔
145
        }
1✔
146
        if config.BufferSize <= 0 {
153✔
147
                config.BufferSize = DefaultBulkCheckerConfig().BufferSize
1✔
148
        }
1✔
149

150
        // Create a cancellable context for the BulkChecker
151
        ctx, cancel := context.WithCancel(ctx)
152✔
152

152✔
153
        // Initialize the BulkChecker with all components
152✔
154
        bc := &BulkChecker{
152✔
155
                typ:            typ,
152✔
156
                checker:        checker,
152✔
157
                config:         config,
152✔
158
                ctx:            ctx,
152✔
159
                cancel:         cancel,
152✔
160
                requestChan:    make(chan BulkCheckerRequest, config.BufferSize),
152✔
161
                requests:       make([]BulkCheckerRequest, 0, config.BufferSize),
152✔
162
                callback:       callback,
152✔
163
                collectionDone: make(chan struct{}),
152✔
164
        }
152✔
165

152✔
166
        // Start the background request collection goroutine
152✔
167
        go bc.collectRequests()
152✔
168

152✔
169
        return bc, nil
152✔
170
}
171

172
// collectRequests safely collects requests until the channel is closed.
173
// This method runs in a separate goroutine and continuously processes
174
// incoming requests until either the channel is closed or the context is cancelled.
175
// It ensures thread-safe addition of requests to the internal collection.
176
func (bc *BulkChecker) collectRequests() {
152✔
177
        // Signal completion when this goroutine exits
152✔
178
        defer close(bc.collectionDone)
152✔
179

152✔
180
        for {
1,451✔
181
                select {
1,299✔
182
                case req, ok := <-bc.requestChan:
1,285✔
183
                        if !ok {
1,423✔
184
                                // Channel closed, stop collecting
138✔
185
                                return
138✔
186
                        }
138✔
187
                        bc.addRequest(req)
1,147✔
188
                case <-bc.ctx.Done():
4✔
189
                        // Context cancelled, stop collecting
4✔
190
                        return
4✔
191
                }
192
        }
193
}
194

195
// addRequest safely adds a request to the internal list.
196
// This method uses a mutex to ensure thread-safe access to the requests slice,
197
// preventing race conditions when multiple goroutines are adding requests.
198
func (bc *BulkChecker) addRequest(req BulkCheckerRequest) {
1,147✔
199
        bc.requestsMu.Lock()
1,147✔
200
        defer bc.requestsMu.Unlock()
1,147✔
201
        bc.requests = append(bc.requests, req)
1,147✔
202
}
1,147✔
203

204
// StopCollectingRequests safely stops request collection and waits for completion.
205
// This method closes the input channel and waits for the collection goroutine
206
// to finish processing any remaining requests. This ensures that no requests
207
// are lost during shutdown.
208
func (bc *BulkChecker) StopCollectingRequests() {
138✔
209
        close(bc.requestChan)
138✔
210
        <-bc.collectionDone // Wait for collection to complete
138✔
211
}
138✔
212

213
// getSortedRequests returns a sorted copy of requests based on the checker type.
214
// This method creates a copy of the requests to avoid modifying the original
215
// collection and sorts them according to the BulkCheckerType (entity ID or subject ID).
216
// The sorting ensures consistent and predictable result ordering.
217
func (bc *BulkChecker) getSortedRequests() []BulkCheckerRequest {
675✔
218
        bc.requestsMu.RLock()
675✔
219
        defer bc.requestsMu.RUnlock()
675✔
220

675✔
221
        // Create a copy to avoid modifying the original
675✔
222
        requests := make([]BulkCheckerRequest, len(bc.requests))
675✔
223
        copy(requests, bc.requests)
675✔
224

675✔
225
        // Sort the copy based on the checker type
675✔
226
        bc.sortRequests(requests)
675✔
227
        return requests
675✔
228
}
675✔
229

230
// sortRequests sorts requests based on the checker type.
231
// This method implements different sorting strategies:
232
// - For entity-based checks: sorts by entity ID
233
// - For subject-based checks: sorts by subject ID
234
// The sorting ensures that results are processed in a consistent order.
235
func (bc *BulkChecker) sortRequests(requests []BulkCheckerRequest) {
677✔
236
        switch bc.typ {
677✔
237
        case BulkCheckerTypeEntity:
676✔
238
                sort.Slice(requests, func(i, j int) bool {
127,314✔
239
                        return requests[i].Request.GetEntity().GetId() < requests[j].Request.GetEntity().GetId()
126,638✔
240
                })
126,638✔
241
        case BulkCheckerTypeSubject:
1✔
242
                sort.Slice(requests, func(i, j int) bool {
4✔
243
                        return requests[i].Request.GetSubject().GetId() < requests[j].Request.GetSubject().GetId()
3✔
244
                })
3✔
245
        }
246
}
247

248
// ExecuteRequests processes requests concurrently with comprehensive error handling and resource management.
249
// This method is the main entry point for bulk permission checking. It:
250
// 1. Stops collecting new requests
251
// 2. Sorts all collected requests
252
// 3. Processes them concurrently with controlled parallelism
253
// 4. Maintains strict ordering of results
254
// 5. Handles errors gracefully and manages resources properly
255
//
256
// Parameters:
257
//   - size: The maximum number of successful results to process
258
//
259
// Returns:
260
//   - error: Any error that occurred during processing (context cancellation is not considered an error)
261
func (bc *BulkChecker) ExecuteRequests(size uint32) error { // Main execution entry point
139✔
262
        if size == 0 {
140✔
263
                return fmt.Errorf("size must be greater than 0")
1✔
264
        }
1✔
265
        // Stop collecting new requests and wait for collection to complete
266
        bc.StopCollectingRequests() // Ensure no new requests are added
138✔
267

138✔
268
        // Get sorted requests for processing
138✔
269
        requests := bc.getSortedRequests()
138✔
270
        if len(requests) == 0 {
147✔
271
                return nil // No requests to process
9✔
272
        }
9✔
273

274
        // Initialize execution state for tracking progress
275
        bc.executionState = &executionState{
129✔
276
                results: make([]base.CheckResult, len(requests)),
129✔
277
                limit:   int64(size),
129✔
278
        }
129✔
279

129✔
280
        // Create execution context with cancellation for graceful shutdown
129✔
281
        execCtx, execCancel := context.WithCancel(bc.ctx)
129✔
282
        defer execCancel()
129✔
283

129✔
284
        // Create semaphore to control concurrency
129✔
285
        sem := semaphore.NewWeighted(int64(bc.config.ConcurrencyLimit))
129✔
286

129✔
287
        // Create error group for managing goroutines and error propagation
129✔
288
        g, ctx := errgroup.WithContext(execCtx)
129✔
289

129✔
290
        // Process requests concurrently
129✔
291
        for i, req := range requests {
1,275✔
292
                // Check if we've reached the success limit
1,146✔
293
                if atomic.LoadInt64(&bc.executionState.successCount) >= int64(size) {
1,146✔
294
                        break
×
295
                }
296

297
                index := i
1,146✔
298
                request := req
1,146✔
299

1,146✔
300
                // Launch goroutine for each request
1,146✔
301
                g.Go(func() error {
2,292✔
302
                        return bc.processRequest(ctx, sem, index, request)
1,146✔
303
                })
1,146✔
304
        }
305

306
        // Wait for all goroutines to complete and handle any errors
307
        if err := g.Wait(); err != nil {
129✔
308
                if isContextError(err) {
×
309
                        return nil // Context cancellation is not an error
×
310
                }
×
311
                return fmt.Errorf("bulk execution failed: %w", err)
×
312
        }
313

314
        return nil
129✔
315
}
316

317
// processRequest handles a single request with comprehensive error handling.
318
// This method is executed in a separate goroutine for each request and:
319
// 1. Acquires a semaphore slot to control concurrency
320
// 2. Performs the permission check or uses pre-computed result
321
// 3. Processes the result in the correct order
322
// 4. Handles context cancellation and other errors gracefully
323
//
324
// Parameters:
325
//   - ctx: Context for cancellation and timeout
326
//   - sem: Semaphore for concurrency control
327
//   - index: The index of this request in the sorted list
328
//   - req: The permission check request to process
329
//
330
// Returns:
331
//   - error: Any error that occurred during processing
332
func (bc *BulkChecker) processRequest(ctx context.Context, sem *semaphore.Weighted, index int, req BulkCheckerRequest) error {
1,146✔
333
        // Check context before acquiring semaphore
1,146✔
334
        if err := ctx.Err(); err != nil {
1,146✔
335
                return nil
×
336
        }
×
337

338
        // Acquire semaphore slot to control concurrency
339
        if err := sem.Acquire(ctx, 1); err != nil {
1,146✔
340
                if isContextError(err) {
×
341
                        return nil
×
342
                }
×
343
                return fmt.Errorf("failed to acquire semaphore: %w", err)
×
344
        }
345
        defer sem.Release(1)
1,146✔
346

1,146✔
347
        // Determine the result for this request
1,146✔
348
        result, err := bc.getRequestResult(ctx, req)
1,146✔
349
        if err != nil {
1,146✔
350
                if isContextError(err) {
×
351
                        return nil
×
352
                }
×
353
                return fmt.Errorf("failed to get request result: %w", err)
×
354
        }
355

356
        // Process the result in the correct order
357
        return bc.processResult(index, result)
1,146✔
358
}
359

360
// getRequestResult determines the result for a request.
361
// This method either uses a pre-computed result if available,
362
// or performs the actual permission check using the underlying checker.
363
//
364
// Parameters:
365
//   - ctx: Context for the permission check
366
//   - req: The request to get the result for
367
//
368
// Returns:
369
//   - base.CheckResult: The result of the permission check
370
//   - error: Any error that occurred during the check
371
func (bc *BulkChecker) getRequestResult(ctx context.Context, req BulkCheckerRequest) (base.CheckResult, error) {
1,148✔
372
        // Use pre-computed result if available
1,148✔
373
        if req.Result != base.CheckResult_CHECK_RESULT_UNSPECIFIED {
1,149✔
374
                return req.Result, nil
1✔
375
        }
1✔
376

377
        // Perform the actual permission check
378
        response, err := bc.checker.Check(ctx, req.Request)
1,147✔
379
        if err != nil {
1,148✔
380
                return base.CheckResult_CHECK_RESULT_UNSPECIFIED, err
1✔
381
        }
1✔
382

383
        return response.GetCan(), nil
1,146✔
384
}
385

386
// processResult processes a single result with thread-safe state updates.
387
// This method ensures that results are processed in the correct order
388
// even when requests complete out of order due to concurrent processing.
389
// It maintains the execution state and calls the callback for successful results.
390
//
391
// Parameters:
392
//   - index: The index of the result in the sorted list
393
//   - result: The result of the permission check
394
//
395
// Returns:
396
//   - error: Any error that occurred during processing
397
func (bc *BulkChecker) processResult(index int, result base.CheckResult) error {
1,146✔
398
        bc.executionState.mu.Lock()
1,146✔
399
        defer bc.executionState.mu.Unlock()
1,146✔
400

1,146✔
401
        // Store the result at the correct index
1,146✔
402
        bc.executionState.results[index] = result
1,146✔
403

1,146✔
404
        // Process results in order, starting from the current processed index
1,146✔
405
        for bc.executionState.processedIndex < len(bc.executionState.results) {
2,762✔
406
                currentResult := bc.executionState.results[bc.executionState.processedIndex]
1,616✔
407
                if currentResult == base.CheckResult_CHECK_RESULT_UNSPECIFIED {
2,458✔
408
                        break // Wait for this result to be computed
842✔
409
                }
410

411
                // Process successful results
412
                if currentResult == base.CheckResult_CHECK_RESULT_ALLOWED {
1,503✔
413
                        // Check if we've reached the success limit
729✔
414
                        if atomic.LoadInt64(&bc.executionState.successCount) >= bc.executionState.limit {
921✔
415
                                return nil
192✔
416
                        }
192✔
417

418
                        // Increment success count and call callback
419
                        atomic.AddInt64(&bc.executionState.successCount, 1)
537✔
420
                        bc.callbackWithToken(bc.executionState.processedIndex)
537✔
421
                }
422

423
                // Move to the next result
424
                bc.executionState.processedIndex++
582✔
425
        }
426

427
        return nil
954✔
428
}
429

430
// callbackWithToken calls the callback with the appropriate entity/subject ID and continuous token.
431
// This method retrieves the correct request from the sorted list and generates
432
// the appropriate continuous token for pagination support.
433
//
434
// Parameters:
435
//   - index: The index of the result in the sorted list
436
func (bc *BulkChecker) callbackWithToken(index int) {
537✔
437
        requests := bc.getSortedRequests()
537✔
438

537✔
439
        // Validate index bounds
537✔
440
        if index >= len(requests) {
537✔
441
                return
×
442
        }
×
443

444
        var id string
537✔
445
        var ct string
537✔
446

537✔
447
        // Extract ID and generate continuous token based on checker type
537✔
448
        switch bc.typ {
537✔
449
        case BulkCheckerTypeEntity:
537✔
450
                id = requests[index].Request.GetEntity().GetId()
537✔
451
                if index+1 < len(requests) {
971✔
452
                        ct = utils.NewContinuousToken(requests[index+1].Request.GetEntity().GetId()).Encode().String()
434✔
453
                }
434✔
454
        case BulkCheckerTypeSubject:
×
455
                id = requests[index].Request.GetSubject().GetId()
×
456
                if index+1 < len(requests) {
×
457
                        ct = utils.NewContinuousToken(requests[index+1].Request.GetSubject().GetId()).Encode().String()
×
458
                }
×
459
        }
460

461
        // Call the user-provided callback
462
        bc.callback(id, ct)
537✔
463
}
464

465
// Close properly cleans up resources and cancels all operations.
466
// This method should be called when the BulkChecker is no longer needed
467
// to ensure proper resource cleanup and prevent goroutine leaks.
468
//
469
// Returns:
470
//   - error: Any error that occurred during cleanup
471
func (bc *BulkChecker) Close() error {
138✔
472
        bc.cancel()
138✔
473
        return nil
138✔
474
}
138✔
475

476
// BulkEntityPublisher handles entity-based permission check publishing.
477
// This struct provides a convenient interface for publishing entity permission
478
// check requests to a BulkChecker instance.
479
type BulkEntityPublisher struct {
480
        // bulkChecker is the target BulkChecker for publishing requests
481
        bulkChecker *BulkChecker
482
        // request contains the base lookup request parameters
483
        request *base.PermissionLookupEntityRequest
484
}
485

486
// NewBulkEntityPublisher creates a new BulkEntityPublisher instance.
487
// This constructor initializes a publisher for entity-based permission checks.
488
//
489
// Parameters:
490
//   - ctx: Context for the publisher (currently unused but kept for API consistency)
491
//   - request: The base lookup request containing common parameters
492
//   - bulkChecker: The BulkChecker instance to publish to
493
//
494
// Returns:
495
//   - *BulkEntityPublisher: The initialized publisher instance
496
func NewBulkEntityPublisher(ctx context.Context, request *base.PermissionLookupEntityRequest, bulkChecker *BulkChecker) *BulkEntityPublisher {
139✔
497
        return &BulkEntityPublisher{
139✔
498
                bulkChecker: bulkChecker,
139✔
499
                request:     request,
139✔
500
        }
139✔
501
}
139✔
502

503
// Publish sends an entity permission check request to the bulk checker.
504
// This method creates a permission check request from the provided parameters
505
// and sends it to the BulkChecker for processing. It handles context cancellation
506
// gracefully by dropping requests when the context is done.
507
//
508
// Parameters:
509
//   - entity: The entity to check permissions for
510
//   - metadata: Metadata for the permission check request
511
//   - context: Additional context for the permission check
512
//   - result: Optional pre-computed result
513
func (p *BulkEntityPublisher) Publish(entity *base.Entity, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) {
1,147✔
514
        select {
1,147✔
515
        case p.bulkChecker.requestChan <- BulkCheckerRequest{
516
                Request: &base.PermissionCheckRequest{
517
                        TenantId:   p.request.GetTenantId(),
518
                        Metadata:   metadata,
519
                        Entity:     entity,
520
                        Permission: p.request.GetPermission(),
521
                        Subject:    p.request.GetSubject(),
522
                        Context:    context,
523
                },
524
                Result: result,
525
        }:
1,147✔
526
        case <-p.bulkChecker.ctx.Done():
×
527
                // Context cancelled, drop the request
528
        }
529
}
530

531
// BulkSubjectPublisher handles subject-based permission check publishing.
532
// This struct provides a convenient interface for publishing subject permission
533
// check requests to a BulkChecker instance.
534
type BulkSubjectPublisher struct {
535
        // bulkChecker is the target BulkChecker for publishing requests
536
        bulkChecker *BulkChecker
537
        // request contains the base lookup request parameters
538
        request *base.PermissionLookupSubjectRequest
539
}
540

541
// NewBulkSubjectPublisher creates a new BulkSubjectPublisher instance.
542
// This constructor initializes a publisher for subject-based permission checks.
543
//
544
// Parameters:
545
//   - ctx: Context for the publisher (currently unused but kept for API consistency)
546
//   - request: The base lookup request containing common parameters
547
//   - bulkChecker: The BulkChecker instance to publish to
548
//
549
// Returns:
550
//   - *BulkSubjectPublisher: The initialized publisher instance
551
func NewBulkSubjectPublisher(ctx context.Context, request *base.PermissionLookupSubjectRequest, bulkChecker *BulkChecker) *BulkSubjectPublisher {
1✔
552
        return &BulkSubjectPublisher{
1✔
553
                bulkChecker: bulkChecker,
1✔
554
                request:     request,
1✔
555
        }
1✔
556
}
1✔
557

558
// Publish sends a subject permission check request to the bulk checker.
559
// This method creates a permission check request from the provided parameters
560
// and sends it to the BulkChecker for processing. It handles context cancellation
561
// gracefully by dropping requests when the context is done.
562
//
563
// Parameters:
564
//   - subject: The subject to check permissions for
565
//   - metadata: Metadata for the permission check request
566
//   - context: Additional context for the permission check
567
//   - result: Optional pre-computed result
568
func (p *BulkSubjectPublisher) Publish(subject *base.Subject, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) {
1✔
569
        select {
1✔
570
        case p.bulkChecker.requestChan <- BulkCheckerRequest{
571
                Request: &base.PermissionCheckRequest{
572
                        TenantId:   p.request.GetTenantId(),
573
                        Metadata:   metadata,
574
                        Entity:     p.request.GetEntity(),
575
                        Permission: p.request.GetPermission(),
576
                        Subject:    subject,
577
                        Context:    context,
578
                },
579
                Result: result,
580
        }:
×
581
        case <-p.bulkChecker.ctx.Done():
1✔
582
                // Context cancelled, drop the request
583
        }
584
}
585

586
// isContextError checks if an error is related to context cancellation or timeout.
587
// This helper function centralizes the logic for identifying context-related errors
588
// that should not be treated as actual errors in the bulk processing system.
589
//
590
// Parameters:
591
//   - err: The error to check
592
//
593
// Returns:
594
//   - bool: True if the error is context-related, false otherwise
595
func isContextError(err error) bool {
6✔
596
        return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
6✔
597
}
6✔
598

599
// IsContextRelatedError is a legacy function maintained for backward compatibility.
600
// This function provides the same functionality as isContextError but with
601
// the original signature to maintain compatibility with existing code.
602
//
603
// Parameters:
604
//   - ctx: Context (unused, kept for compatibility)
605
//   - err: The error to check
606
//
607
// Returns:
608
//   - bool: True if the error is context-related, false otherwise
609
func IsContextRelatedError(ctx context.Context, err error) bool {
3✔
610
        return isContextError(err)
3✔
611
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc