• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

GrottoCenter / grottocenter-api / 25580542847

08 May 2026 09:30PM UTC coverage: 86.665% (-0.2%) from 86.873%
25580542847

Pull #1573

github

ClemRz
feat(geocoding): replace sync Nominatim with offline country resolution and async queue

Resolves #1570

- Add CountryResolverService using @rapideditor/country-coder for offline
  point-in-polygon country resolution with hierarchy-based fallback
- Add EnrichmentQueueService using pg-boss (PostgreSQL-backed) for async
  Nominatim enrichment of region, county, city, iso_3166_2
- Remove all synchronous GeocodingService.reverse() calls from entrance
  create/update and organization create/update
- Country cache loaded from t_country at startup, held in memory
- Queue processes at 1 req/s with exponential backoff retry (max 5)
- Stale enrichment fields cleared on coordinate change
- Trace ID propagated from request to background job logs
- Fallback to '00' (Undefined) when country cannot be determined
Pull Request #1573: feat(geocoding): offline country resolution + async enrichment queue

3082 of 3682 branches covered (83.7%)

Branch coverage included in aggregate %.

94 of 126 new or added lines in 8 files covered. (74.6%)

1 existing line in 1 file now uncovered.

6329 of 7177 relevant lines covered (88.18%)

53.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.93
/api/services/EnrichmentQueueService.js
1
/**
2
 * EnrichmentQueueService.js
3
 *
4
 * @description :: Manages the pg-boss backed job queue for asynchronous
5
 *   Nominatim reverse-geocoding enrichment of entrances and organizations.
6
 *
7
 * NOTE: The pg-boss instance is stored on `sails.enrichmentBoss` rather than
8
 * a module-level variable because Sails' include-all clears the Node require
9
 * cache after loading services. A `let boss` at module scope would be a
10
 * different variable for each require() call, causing the "queue not
11
 * initialized" issue.
12
 */
13

14
const { PgBoss } = require('pg-boss');
5✔
15
const logger = require('../utils/logger');
5✔
16

17
const QUEUE_NAME = 'geocoding-enrichment';
5✔
18

19
const TRANSIENT_ERROR_CODES = ['ECONNREFUSED', 'ETIMEDOUT', 'ENOTFOUND'];
5✔
20

21
module.exports = {
5✔
22
  /**
23
   * Set boss reference (for testing).
24
   * @param {PgBoss|object|null} instance
25
   */
26
  setBoss(instance) {
27
    sails.enrichmentBoss = instance;
3✔
28
  },
29

30
  /**
31
   * Initialize pg-boss and register the worker.
32
   * Called from bootstrap.js.
33
   */
34
  async start() {
NEW
35
    const connectionString = sails.config.datastores.default.url;
×
NEW
36
    sails.enrichmentBoss = new PgBoss({
×
37
      connectionString,
38
      retryLimit: 5,
39
      retryDelay: 30,
40
      retryBackoff: true,
41
      expireInHours: 24,
42
    });
43

NEW
44
    sails.enrichmentBoss.on('error', (err) => {
×
NEW
45
      sails.log.error(
×
46
        'EnrichmentQueueService pg-boss error:',
47
        err.message || err
×
48
      );
49
    });
50

NEW
51
    await sails.enrichmentBoss.start();
×
NEW
52
    await sails.enrichmentBoss.createQueue(QUEUE_NAME);
×
53

NEW
54
    await sails.enrichmentBoss.work(
×
55
      QUEUE_NAME,
56
      { newJobCheckInterval: 1000 },
57
      module.exports.processJob
58
    );
59

NEW
60
    sails.log.info('EnrichmentQueueService: started and listening for jobs');
×
61
  },
62

63
  /**
64
   * Gracefully stop pg-boss.
65
   */
66
  async stop() {
NEW
67
    if (sails.enrichmentBoss) {
×
NEW
68
      await sails.enrichmentBoss.stop({ graceful: true, timeout: 30000 });
×
NEW
69
      sails.log.info('EnrichmentQueueService: stopped gracefully');
×
70
    }
71
  },
72

73
  /**
74
   * Enqueue an enrichment job.
75
   *
76
   * @param {number} entityId - The ID of the entrance or organization
77
   * @param {'entrance'|'organization'} entityType
78
   * @param {string} [traceId] - Optional trace ID from the originating request
79
   */
80
  async enqueue(entityId, entityType, traceId) {
81
    if (!sails.enrichmentBoss) {
12✔
82
      sails.log.warn(
11✔
83
        'EnrichmentQueueService: queue not initialized, skipping enqueue'
84
      );
85
      return;
11✔
86
    }
87
    await sails.enrichmentBoss.send(
1✔
88
      QUEUE_NAME,
89
      { entityId, entityType, traceId },
90
      { singletonKey: `${entityType}-${entityId}` }
91
    );
92
    sails.log.info(`Enrichment job enqueued: ${entityType} ${entityId}`);
1✔
93
  },
94

95
  /**
96
   * Process enrichment jobs (pg-boss v12 passes an array of jobs).
97
   * @param {object[]} jobs - array of pg-boss job objects
98
   */
99
  async processJob(jobs) {
100
    const jobList = Array.isArray(jobs) ? jobs : [jobs];
3!
101
    for (const job of jobList) {
3✔
102
      if (!job.data) {
3!
NEW
103
        sails.log.warn(
×
104
          `EnrichmentQueueService: job ${job.id} has no data, skipping`
105
        );
106
        // eslint-disable-next-line no-continue
NEW
107
        continue;
×
108
      }
109
      const { entityId, entityType, traceId } = job.data;
3✔
110

111
      const tid = traceId || 'enrichment';
3✔
112
      // eslint-disable-next-line no-await-in-loop
113
      await logger.run(tid, async () => {
3✔
114
        const startTime = Date.now();
3✔
115
        try {
3✔
116
          if (entityType === 'entrance') {
3!
117
            await module.exports.processEntrance(entityId);
3✔
NEW
118
          } else if (entityType === 'organization') {
×
NEW
119
            await module.exports.processOrganization(entityId);
×
120
          } else {
NEW
121
            sails.log.warn(
×
122
              `EnrichmentQueueService: unknown entityType "${entityType}" for job ${job.id}`
123
            );
124
          }
125
        } catch (err) {
126
          if (
3✔
127
            err.statusCode === 429 ||
5✔
128
            TRANSIENT_ERROR_CODES.includes(err.code)
129
          ) {
130
            throw err;
2✔
131
          }
132
          sails.log.error(
1✔
133
            `EnrichmentQueueService: permanent failure for ${entityType} ${entityId} (${Date.now() - startTime}ms):`,
134
            err
135
          );
136
          return;
1✔
137
        }
NEW
138
        sails.log.info(
×
139
          `Enrichment job done: ${entityType} ${entityId} ${Date.now() - startTime}ms`
140
        );
141
      });
142
    }
143
  },
144

145
  /**
146
   * Process an entrance enrichment job.
147
   * @param {number} entranceId
148
   */
149
  async processEntrance(entranceId) {
150
    sails.log.info(`Enrichment job started: entrance ${entranceId}`);
7✔
151
    const entrance = await TEntrance.findOne({ id: entranceId });
7✔
152
    if (!entrance || entrance.isDeleted) {
4✔
153
      sails.log.info(
2✔
154
        `Enrichment job skipped: entrance ${entranceId} (not found or deleted)`
155
      );
156
      return;
2✔
157
    }
158

159
    const address = await sails.services.geocodingservice.reverse(
2✔
160
      entrance.latitude,
161
      entrance.longitude
162
    );
163
    if (!address) {
2✔
164
      sails.log.info(
1✔
165
        `Enrichment job completed: entrance ${entranceId} (no address from Nominatim)`
166
      );
167
      return;
1✔
168
    }
169

170
    await TEntrance.updateOne({ id: entranceId }).set({
1✔
171
      region: address.region,
172
      county: address.county,
173
      city: address.city,
174
      iso_3166_2: address.iso_3166_2,
175
    });
176
    sails.log.info(
1✔
177
      `Enrichment job completed: entrance ${entranceId} (region=${address.region}, iso_3166_2=${address.iso_3166_2})`
178
    );
179
  },
180

181
  /**
182
   * Process an organization enrichment job.
183
   * @param {number} organizationId
184
   */
185
  async processOrganization(organizationId) {
186
    sails.log.info(`Enrichment job started: organization ${organizationId}`);
2✔
187
    const org = await TGrotto.findOne({ id: organizationId });
2✔
188
    if (!org || org.isDeleted) {
2!
NEW
189
      sails.log.info(
×
190
        `Enrichment job skipped: organization ${organizationId} (not found or deleted)`
191
      );
NEW
192
      return;
×
193
    }
194

195
    if (!org.latitude || !org.longitude) {
2✔
196
      sails.log.info(
1✔
197
        `Enrichment job skipped: organization ${organizationId} (no coordinates)`
198
      );
199
      return;
1✔
200
    }
201

202
    const address = await sails.services.geocodingservice.reverse(
1✔
203
      org.latitude,
204
      org.longitude
205
    );
206
    if (!address) {
1!
NEW
207
      sails.log.info(
×
208
        `Enrichment job completed: organization ${organizationId} (no address from Nominatim)`
209
      );
NEW
210
      return;
×
211
    }
212

213
    await TGrotto.updateOne({ id: organizationId }).set({
1✔
214
      iso_3166_2: address.iso_3166_2,
215
    });
216
    sails.log.info(
1✔
217
      `Enrichment job completed: organization ${organizationId} (iso_3166_2=${address.iso_3166_2})`
218
    );
219
  },
220
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc