• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vocdoni / saas-backend / 16301338771

15 Jul 2025 06:29PM UTC coverage: 56.793% (+0.7%) from 56.082%
16301338771

Pull #165

github

emmdim
refactor:  census creation

- Removed the PublishedCensus type and added it as Census parameter.
- Introduced new OrgMemberAuthFields defining the data options for member authentication.
- Added the `CheckOrgMemberAuthFields` function that checks a set of members and given auth fields empties an
d duplicates
- Add the option to create a census through the api based on a given group
Pull Request #165: Implements group based census creation

250 of 399 new or added lines in 9 files covered. (62.66%)

4 existing lines in 3 files now uncovered.

5104 of 8987 relevant lines covered (56.79%)

25.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.98
/db/census_participant.go
1
package db
2

3
import (
4
        "context"
5
        "fmt"
6
        "time"
7

8
        "github.com/vocdoni/saas-backend/internal"
9
        "go.mongodb.org/mongo-driver/bson"
10
        "go.mongodb.org/mongo-driver/bson/primitive"
11
        "go.mongodb.org/mongo-driver/mongo"
12
        "go.mongodb.org/mongo-driver/mongo/options"
13
        "go.vocdoni.io/dvote/log"
14
)
15

16
// validateCensusParticipant validates that a census participant can be created
17
// by checking that the census exists, the organization exists, and the member exists
18
func (ms *MongoStorage) validateCensusParticipant(participant *CensusParticipant) (string, error) {
8✔
19
        // validate required fields
8✔
20
        if len(participant.ParticipantID) == 0 || len(participant.CensusID) == 0 {
10✔
21
                return "", ErrInvalidData
2✔
22
        }
2✔
23

24
        // check that the published census exists
25
        census, err := ms.Census(participant.CensusID)
6✔
26
        if err != nil {
7✔
27
                return "", fmt.Errorf("failed to get published census: %w", err)
1✔
28
        }
1✔
29

30
        // check that the org exists
31
        _, err = ms.Organization(census.OrgAddress)
5✔
32
        if err != nil {
5✔
33
                if err == ErrNotFound {
×
34
                        return "", ErrInvalidData
×
35
                }
×
36
                return "", fmt.Errorf("organization not found: %w", err)
×
37
        }
38

39
        // check that the member exists
40
        if _, err := ms.OrgMember(census.OrgAddress, participant.ParticipantID); err != nil {
6✔
41
                return "", fmt.Errorf("failed to get org member: %w", err)
1✔
42
        }
1✔
43

44
        return census.OrgAddress, nil
4✔
45
}
46

47
// SetCensusParticipant creates or updates a census participant in the database.
48
// If the participant already exists (same participantID and censusID), it updates it.
49
// If it doesn't exist, it creates a new one.
50
func (ms *MongoStorage) SetCensusParticipant(participant *CensusParticipant) error {
8✔
51
        // Validate the participant
8✔
52
        _, err := ms.validateCensusParticipant(participant)
8✔
53
        if err != nil {
12✔
54
                return err
4✔
55
        }
4✔
56

57
        // prepare filter for upsert
58
        filter := bson.M{
4✔
59
                "participantID": participant.ParticipantID,
4✔
60
                "censusId":      participant.CensusID,
4✔
61
        }
4✔
62

4✔
63
        // set timestamps
4✔
64
        now := time.Now()
4✔
65
        participant.UpdatedAt = now
4✔
66
        if participant.CreatedAt.IsZero() {
7✔
67
                participant.CreatedAt = now
3✔
68
        }
3✔
69

70
        // create update document
71
        updateDoc := bson.M{
4✔
72
                "$set": participant,
4✔
73
        }
4✔
74

4✔
75
        // Perform database operation
4✔
76
        ms.keysLock.Lock()
4✔
77
        defer ms.keysLock.Unlock()
4✔
78

4✔
79
        ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
4✔
80
        defer cancel()
4✔
81

4✔
82
        opts := options.Update().SetUpsert(true)
4✔
83
        if _, err := ms.censusParticipants.UpdateOne(ctx, filter, updateDoc, opts); err != nil {
4✔
84
                return fmt.Errorf("failed to set census participant: %w", err)
×
85
        }
×
86

87
        return nil
4✔
88
}
89

90
// CensusParticipant retrieves a census participant from the database based on
91
// participantID and censusID. Returns ErrNotFound if the participant doesn't exist.
92
func (ms *MongoStorage) CensusParticipant(censusID, id string) (*CensusParticipant, error) {
9✔
93
        // create a context with a timeout
9✔
94
        ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
9✔
95
        defer cancel()
9✔
96

9✔
97
        // validate input
9✔
98
        if len(id) == 0 || len(censusID) == 0 {
11✔
99
                return nil, ErrInvalidData
2✔
100
        }
2✔
101

102
        // prepare filter for find
103
        filter := bson.M{
7✔
104
                "participantID": id,
7✔
105
                "censusId":      censusID,
7✔
106
        }
7✔
107

7✔
108
        // find the participant
7✔
109
        participant := &CensusParticipant{}
7✔
110
        err := ms.censusParticipants.FindOne(ctx, filter).Decode(participant)
7✔
111
        if err != nil {
9✔
112
                if err == mongo.ErrNoDocuments {
4✔
113
                        return nil, ErrNotFound
2✔
114
                }
2✔
115
                return nil, fmt.Errorf("failed to get census participant: %w", err)
×
116
        }
117

118
        return participant, nil
5✔
119
}
120

121
// CensusParticipantByMemberNumber retrieves a census participant from the database based on
122
// memberNumber and censusID. Returns ErrNotFound if the participant doesn't exist.
123
func (ms *MongoStorage) CensusParticipantByMemberNumber(
124
        censusID string,
125
        memberNumber string,
126
        orgAddress string,
127
) (*CensusParticipant, error) {
12✔
128
        // create a context with a timeout
12✔
129
        ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
12✔
130
        defer cancel()
12✔
131

12✔
132
        // validate input
12✔
133
        if len(memberNumber) == 0 || len(censusID) == 0 {
12✔
134
                return nil, ErrInvalidData
×
135
        }
×
136

137
        orgMember, err := ms.OrgMemberByMemberNumber(orgAddress, memberNumber)
12✔
138
        if err != nil {
13✔
139
                if err == mongo.ErrNoDocuments || err == ErrNotFound {
2✔
140
                        return nil, ErrNotFound
1✔
141
                }
1✔
142
                return nil, fmt.Errorf("failed to get org member: %w", err)
×
143
        }
144

145
        // prepare filter for find
146
        filter := bson.M{
11✔
147
                "participantID": orgMember.ID.Hex(),
11✔
148
                "censusId":      censusID,
11✔
149
        }
11✔
150

11✔
151
        // find the participant
11✔
152
        participant := &CensusParticipant{}
11✔
153
        err = ms.censusParticipants.FindOne(ctx, filter).Decode(participant)
11✔
154
        if err != nil {
11✔
155
                if err == mongo.ErrNoDocuments {
×
156
                        return nil, ErrNotFound
×
157
                }
×
158
                return nil, fmt.Errorf("failed to get census participant: %w", err)
×
159
        }
160

161
        return participant, nil
11✔
162
}
163

164
// DelCensusParticipant removes a census participant from the database.
165
// Returns nil if the participant was successfully deleted or didn't exist.
166
func (ms *MongoStorage) DelCensusParticipant(censusID, participantID string) error {
4✔
167
        ms.keysLock.Lock()
4✔
168
        defer ms.keysLock.Unlock()
4✔
169
        // create a context with a timeout
4✔
170
        ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
4✔
171
        defer cancel()
4✔
172

4✔
173
        // validate input
4✔
174
        if len(participantID) == 0 || len(censusID) == 0 {
6✔
175
                return ErrInvalidData
2✔
176
        }
2✔
177

178
        // prepare filter for upsert
179
        filter := bson.M{
2✔
180
                "participantID": participantID,
2✔
181
                "censusId":      censusID,
2✔
182
        }
2✔
183

2✔
184
        // delete the participant
2✔
185
        _, err := ms.censusParticipants.DeleteOne(ctx, filter)
2✔
186
        if err != nil {
2✔
187
                return fmt.Errorf("failed to delete census participant: %w", err)
×
188
        }
×
189

190
        return nil
2✔
191
}
192

193
// BulkCensusParticipantStatus is returned by SetBylkCensusParticipant to provide the output.
194
type BulkCensusParticipantStatus struct {
195
        Progress int `json:"progress"`
196
        Total    int `json:"total"`
197
        Added    int `json:"added"`
198
}
199

200
// prepareMember processes a member for storage by:
201
// - Setting the organization address
202
// - Setting the creation timestamp
203
// - Hashing sensitive data (email, phone, password)
204
// - Clearing the original sensitive data
205
func prepareMember(member *OrgMember, orgAddress, salt string, currentTime time.Time) {
17✔
206
        // Assign a new internal ID if not provided
17✔
207
        if member.ID == primitive.NilObjectID {
32✔
208
                member.ID = primitive.NewObjectID()
15✔
209
        }
15✔
210

211
        member.OrgAddress = orgAddress
17✔
212
        member.CreatedAt = currentTime
17✔
213

17✔
214
        // Hash phone if valid
17✔
215
        if member.Phone != "" {
34✔
216
                pn, err := internal.SanitizeAndVerifyPhoneNumber(member.Phone)
17✔
217
                if err != nil {
23✔
218
                        log.Warnw("invalid phone number", "phone", member.Phone)
6✔
219
                        member.Phone = ""
6✔
220
                } else {
17✔
221
                        member.HashedPhone = internal.HashOrgData(orgAddress, pn)
11✔
222
                        member.Phone = ""
11✔
223
                }
11✔
224
        }
225

226
        // Hash password if present
227
        if member.Password != "" {
29✔
228
                member.HashedPass = internal.HashPassword(salt, member.Password)
12✔
229
                member.Password = ""
12✔
230
        }
12✔
231
}
232

233
// createCensusParticipantBulkOperations creates the bulk write operations for members and participants
234
func createCensusParticipantBulkOperations(
235
        orgMembers []OrgMember,
236
        orgAddress string,
237
        censusID string,
238
        salt string,
239
        currentTime time.Time,
240
) (orgMembersOps []mongo.WriteModel, censusParticipantOps []mongo.WriteModel) {
7✔
241
        var bulkOrgMembersOps []mongo.WriteModel
7✔
242
        var bulkCensusParticipantsOps []mongo.WriteModel
7✔
243

7✔
244
        for _, orgMember := range orgMembers {
24✔
245
                // Prepare the member
17✔
246
                prepareMember(&orgMember, orgAddress, salt, currentTime)
17✔
247

17✔
248
                // Create member filter and update document
17✔
249
                memberFilter := bson.M{
17✔
250
                        "_id":        orgMember.ID,
17✔
251
                        "orgAddress": orgAddress,
17✔
252
                }
17✔
253

17✔
254
                updateOrgMembersDoc, err := dynamicUpdateDocument(orgMember, nil)
17✔
255
                if err != nil {
17✔
256
                        log.Warnw("failed to create update document for member",
×
257
                                "error", err, "ID", orgMember.ID)
×
258
                        continue // Skip this member but continue with others
×
259
                }
260

261
                // Create member upsert model
262
                upsertOrgMembersModel := mongo.NewUpdateOneModel().
17✔
263
                        SetFilter(memberFilter).
17✔
264
                        SetUpdate(updateOrgMembersDoc).
17✔
265
                        SetUpsert(true)
17✔
266
                bulkOrgMembersOps = append(bulkOrgMembersOps, upsertOrgMembersModel)
17✔
267

17✔
268
                // Create participant filter and document
17✔
269
                censusParticipantsFilter := bson.M{
17✔
270
                        "participantID": orgMember.ID.Hex(),
17✔
271
                        "censusId":      censusID,
17✔
272
                }
17✔
273
                participantDoc := &CensusParticipant{
17✔
274
                        ParticipantID: orgMember.ID.Hex(),
17✔
275
                        CensusID:      censusID,
17✔
276
                        CreatedAt:     currentTime,
17✔
277
                }
17✔
278

17✔
279
                // Create participant update document
17✔
280
                updateParticipantDoc, err := dynamicUpdateDocument(participantDoc, nil)
17✔
281
                if err != nil {
17✔
282
                        log.Warnw("failed to create update document for participant",
×
283
                                "error", err, "participantID", orgMember.ID.Hex())
×
284
                        continue
×
285
                }
286

287
                // Create participant upsert model
288
                upsertCensusParticipantsModel := mongo.NewUpdateOneModel().
17✔
289
                        SetFilter(censusParticipantsFilter).
17✔
290
                        SetUpdate(updateParticipantDoc).
17✔
291
                        SetUpsert(true)
17✔
292
                bulkCensusParticipantsOps = append(bulkCensusParticipantsOps, upsertCensusParticipantsModel)
17✔
293
        }
294

295
        return bulkOrgMembersOps, bulkCensusParticipantsOps
7✔
296
}
297

298
// processBatch processes a batch of members and returns the number added
299
func (ms *MongoStorage) processBatch(
300
        bulkOrgMembersOps []mongo.WriteModel,
301
        bulkCensusParticipantOps []mongo.WriteModel,
302
) int {
7✔
303
        if len(bulkOrgMembersOps) == 0 {
7✔
304
                return 0
×
305
        }
×
306

307
        // Only lock the mutex during the actual database operations
308
        ms.keysLock.Lock()
7✔
309
        defer ms.keysLock.Unlock()
7✔
310

7✔
311
        // Create a new context for the batch
7✔
312
        batchCtx, batchCancel := context.WithTimeout(context.Background(), batchTimeout)
7✔
313
        defer batchCancel()
7✔
314

7✔
315
        // Execute the bulk write operations for org members
7✔
316
        _, err := ms.orgMembers.BulkWrite(batchCtx, bulkOrgMembersOps)
7✔
317
        if err != nil {
7✔
318
                log.Warnw("failed to perform bulk operation on members", "error", err)
×
319
                return 0
×
320
        }
×
321

322
        // Execute the bulk write operations for census participants
323
        _, err = ms.censusParticipants.BulkWrite(batchCtx, bulkCensusParticipantOps)
7✔
324
        if err != nil {
7✔
325
                log.Warnw("failed to perform bulk operation on participants", "error", err)
×
326
                return 0
×
327
        }
×
328

329
        return len(bulkOrgMembersOps)
7✔
330
}
331

332
// startProgressReporter starts a goroutine that reports progress periodically
333
func startProgressReporter(
334
        ctx context.Context,
335
        progressChan chan<- *BulkCensusParticipantStatus,
336
        totalOrgMembers int,
337
        processedOrgMembers *int,
338
        addedOrgMembers *int,
339
) {
7✔
340
        ticker := time.NewTicker(10 * time.Second)
7✔
341
        defer ticker.Stop()
7✔
342

7✔
343
        for {
14✔
344
                select {
7✔
345
                case <-ticker.C:
×
346
                        // Calculate and send progress percentage
×
347
                        if totalOrgMembers > 0 {
×
348
                                progress := (*processedOrgMembers * 100) / totalOrgMembers
×
349
                                progressChan <- &BulkCensusParticipantStatus{
×
350
                                        Progress: progress,
×
351
                                        Total:    totalOrgMembers,
×
352
                                        Added:    *addedOrgMembers,
×
353
                                }
×
354
                        }
×
355
                case <-ctx.Done():
7✔
356
                        return
7✔
357
                }
358
        }
359
}
360

361
// validateBulkCensusParticipant validates the input parameters for bulk census participant
362
// and returns the census if valid
363
func (ms *MongoStorage) validateBulkCensusParticipant(
364
        censusID string,
365
        orgMembersSize int,
366
) (*Census, error) {
10✔
367
        // Early returns for invalid input
10✔
368
        if orgMembersSize == 0 {
11✔
369
                return nil, nil // Not an error, just no work to do
1✔
370
        }
1✔
371
        if len(censusID) == 0 {
10✔
372
                return nil, ErrInvalidData
1✔
373
        }
1✔
374

375
        // Validate census and organization
376
        census, err := ms.Census(censusID)
8✔
377
        if err != nil {
9✔
378
                return nil, fmt.Errorf("failed to get published census: %w", err)
1✔
379
        }
1✔
380

381
        if _, err := ms.Organization(census.OrgAddress); err != nil {
7✔
382
                return nil, err
×
383
        }
×
384

385
        return census, nil
7✔
386
}
387

388
// processBatches processes members in batches and sends progress updates
389
func (ms *MongoStorage) processBatches(
390
        orgMembers []OrgMember,
391
        census *Census,
392
        censusID string,
393
        salt string,
394
        progressChan chan<- *BulkCensusParticipantStatus,
395
) {
7✔
396
        defer close(progressChan)
7✔
397

7✔
398
        // Process members in batches of 200
7✔
399
        batchSize := 200
7✔
400
        totalOrgMembers := len(orgMembers)
7✔
401
        processedOrgMembers := 0
7✔
402
        addedOrgMembers := 0
7✔
403
        currentTime := time.Now()
7✔
404

7✔
405
        // Send initial progress
7✔
406
        progressChan <- &BulkCensusParticipantStatus{
7✔
407
                Progress: 0,
7✔
408
                Total:    totalOrgMembers,
7✔
409
                Added:    addedOrgMembers,
7✔
410
        }
7✔
411

7✔
412
        // Create a context for the entire operation
7✔
413
        ctx, cancel := context.WithCancel(context.Background())
7✔
414
        defer cancel()
7✔
415

7✔
416
        // Start progress reporter in a separate goroutine
7✔
417
        go startProgressReporter(ctx, progressChan, totalOrgMembers, &processedOrgMembers, &addedOrgMembers)
7✔
418

7✔
419
        // Process members in batches
7✔
420
        for i := 0; i < totalOrgMembers; i += batchSize {
14✔
421
                // Calculate end index for current batch
7✔
422
                end := i + batchSize
7✔
423
                if end > totalOrgMembers {
14✔
424
                        end = totalOrgMembers
7✔
425
                }
7✔
426

427
                // Create bulk operations for this batch
428
                bulkOrgMembersOps, bulkCensusParticipantOps := createCensusParticipantBulkOperations(
7✔
429
                        orgMembers[i:end],
7✔
430
                        census.OrgAddress,
7✔
431
                        censusID,
7✔
432
                        salt,
7✔
433
                        currentTime,
7✔
434
                )
7✔
435

7✔
436
                // Process the batch and get number of added members
7✔
437
                added := ms.processBatch(bulkOrgMembersOps, bulkCensusParticipantOps)
7✔
438
                addedOrgMembers += added
7✔
439

7✔
440
                // Update processed count
7✔
441
                processedOrgMembers += (end - i)
7✔
442
        }
443

444
        // Send final progress (100%)
445
        progressChan <- &BulkCensusParticipantStatus{
7✔
446
                Progress: 100,
7✔
447
                Total:    totalOrgMembers,
7✔
448
                Added:    addedOrgMembers,
7✔
449
        }
7✔
450
}
451

452
// SetBulkCensusOrgMemberParticipant creates or updates an org member and a census participant in the database.
453
// If the participant already exists (same participantID and censusID), it updates it.
454
// If it doesn't exist, it creates a new one.
455
// Processes members in batches of 200 entries.
456
// Returns a channel that sends the percentage of members processed every 10 seconds.
457
// This function must be called in a goroutine.
458
func (ms *MongoStorage) SetBulkCensusOrgMemberParticipant(
459
        salt, censusID string, orgMembers []OrgMember,
460
) (chan *BulkCensusParticipantStatus, error) {
10✔
461
        progressChan := make(chan *BulkCensusParticipantStatus, 10)
10✔
462

10✔
463
        // Validate input parameters
10✔
464
        census, err := ms.validateBulkCensusParticipant(censusID, len(orgMembers))
10✔
465
        if err != nil {
12✔
466
                close(progressChan)
2✔
467
                return progressChan, err
2✔
468
        }
2✔
469

470
        // If no members, return empty channel
471
        if census == nil {
9✔
472
                close(progressChan)
1✔
473
                return progressChan, nil
1✔
474
        }
1✔
475

476
        // Start processing in a goroutine
477
        go ms.processBatches(orgMembers, census, censusID, salt, progressChan)
7✔
478

7✔
479
        return progressChan, nil
7✔
480
}
481

482
func (ms *MongoStorage) setBulkCensusParticipant(
483
        ctx context.Context, censusID string, memberIDs []primitive.ObjectID,
484
) error {
2✔
485
        currentTime := time.Now()
2✔
486
        docs := make([]mongo.WriteModel, 0, len(memberIDs))
2✔
487
        for _, pid := range memberIDs {
6✔
488
                // Create participant filter and document
4✔
489
                id := pid.Hex()
4✔
490
                censusParticipantsFilter := bson.M{
4✔
491
                        "participantID": id,
4✔
492
                        "censusId":      censusID,
4✔
493
                }
4✔
494
                participantDoc := &CensusParticipant{
4✔
495
                        ParticipantID: id,
4✔
496
                        CensusID:      censusID,
4✔
497
                        CreatedAt:     currentTime,
4✔
498
                        UpdatedAt:     currentTime,
4✔
499
                }
4✔
500

4✔
501
                // Create participant update document
4✔
502
                updateParticipantDoc, err := dynamicUpdateDocument(participantDoc, nil)
4✔
503
                if err != nil {
4✔
NEW
504
                        log.Warnw("failed to create update document for participant",
×
NEW
505
                                "error", err, "participantID", id)
×
NEW
506
                        continue
×
507
                }
508

509
                // Create participant upsert model
510
                upsertCensusParticipantsModel := mongo.NewUpdateOneModel().
4✔
511
                        SetFilter(censusParticipantsFilter).
4✔
512
                        SetUpdate(updateParticipantDoc).
4✔
513
                        SetUpsert(true)
4✔
514
                docs = append(docs, upsertCensusParticipantsModel)
4✔
515
        }
516
        // Unordered makes it continue on errors (e.g., one dup), but you can set Ordered=true if you prefer.
517
        bulkOpts := options.BulkWrite().SetOrdered(false)
2✔
518

2✔
519
        _, err := ms.censusParticipants.BulkWrite(ctx, docs, bulkOpts)
2✔
520
        return err
2✔
521
}
522

523
// CensusParticipants retrieves all the census participants for a given census.
524
func (ms *MongoStorage) CensusParticipants(censusID string) ([]CensusParticipant, error) {
2✔
525
        // create a context with a timeout
2✔
526
        ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
2✔
527
        defer cancel()
2✔
528

2✔
529
        // validate input
2✔
530
        if len(censusID) == 0 {
2✔
531
                return nil, ErrInvalidData
×
532
        }
×
533

534
        // prepare filter for upsert
535
        filter := bson.M{
2✔
536
                "censusId": censusID,
2✔
537
        }
2✔
538

2✔
539
        // find the participant
2✔
540
        cursor, err := ms.censusParticipants.Find(ctx, filter)
2✔
541
        if err != nil {
2✔
542
                return nil, fmt.Errorf("failed to get census participants: %w", err)
×
543
        }
×
544
        defer func() {
4✔
545
                if err := cursor.Close(ctx); err != nil {
2✔
546
                        log.Warnw("error closing cursor", "error", err)
×
547
                }
×
548
        }()
549
        var participants []CensusParticipant
2✔
550
        if err := cursor.All(ctx, &participants); err != nil {
2✔
551
                return nil, fmt.Errorf("failed to get census participants: %w", err)
×
552
        }
×
553

554
        return participants, nil
2✔
555
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc