• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

GrottoCenter / grottocenter-api / 25800893259

13 May 2026 01:02PM UTC coverage: 86.196% (-0.5%) from 86.673%
25800893259

push

github

ClemRz
feat(geocoding): replace sync Nominatim with offline country resolution and async queue

Resolves #1570

- Add CountryResolverService using @rapideditor/country-coder for offline
  point-in-polygon country resolution with hierarchy-based fallback
- Add EnrichmentQueueService using pg-boss (PostgreSQL-backed) for async
  Nominatim enrichment of region, county, city, iso_3166_2
- Remove all synchronous GeocodingService.reverse() calls from entrance
  create/update and organization create/update
- Country cache loaded from t_country at startup, held in memory
- Queue processes at 1 req/s with exponential backoff retry (max 5)
- Stale enrichment fields cleared on coordinate change
- Trace ID propagated from request to background job logs
- Fallback to '00' (Undefined) when country cannot be determined

3114 of 3753 branches covered (82.97%)

Branch coverage included in aggregate %.

94 of 142 new or added lines in 8 files covered. (66.2%)

1 existing line in 1 file now uncovered.

6377 of 7258 relevant lines covered (87.86%)

52.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.99
/api/services/EnrichmentQueueService.js
1
/**
2
 * EnrichmentQueueService.js
3
 *
4
 * @description :: Manages the pg-boss backed job queue for asynchronous
5
 *   Nominatim reverse-geocoding enrichment of entrances and organizations.
6
 *
7
 * NOTE: The pg-boss instance is stored on `sails.enrichmentBoss` rather than
8
 * a module-level variable because Sails' include-all clears the Node require
9
 * cache after loading services. A `let boss` at module scope would be a
10
 * different variable for each require() call, causing the "queue not
11
 * initialized" issue.
12
 */
13

14
const { PgBoss } = require('pg-boss');
5✔
15
const logger = require('../utils/logger');
5✔
16

17
const QUEUE_NAME = 'geocoding-enrichment';
5✔
18

19
const TRANSIENT_ERROR_CODES = ['ECONNREFUSED', 'ETIMEDOUT', 'ENOTFOUND'];
5✔
20

21
module.exports = {
5✔
22
  /**
23
   * Set boss reference (for testing).
24
   * @param {PgBoss|object|null} instance
25
   */
26
  setBoss(instance) {
27
    sails.enrichmentBoss = instance;
3✔
28
  },
29

30
  /**
31
   * Build the PostgreSQL connection string for pg-boss.
32
   *
33
   * Prefers the explicit ENRICHMENT_QUEUE_DATABASE_URL env var.
34
   * Otherwise builds the string from the Sails datastore config. The `url`
35
   * field is only used if it looks like a full postgres:// URI; in Azure the
36
   * env var `sails_datastores__default__url` is often set to a bare hostname
37
   * (e.g. "myserver.postgres.database.azure.com") which Waterline handles
38
   * via individual params but pg-boss cannot parse.
39
   *
40
   * @returns {string}
41
   */
42
  getConnectionString() {
NEW
43
    if (process.env.ENRICHMENT_QUEUE_DATABASE_URL) {
×
NEW
44
      return process.env.ENRICHMENT_QUEUE_DATABASE_URL;
×
45
    }
NEW
46
    const dsConfig = sails.config.datastores.default;
×
47

48
    // Only trust `url` if it's a proper connection URI
NEW
49
    if (dsConfig.url && dsConfig.url.startsWith('postgres')) {
×
NEW
50
      return dsConfig.url;
×
51
    }
52

53
    // Build from individual params (+ use `url` as host if it's a bare hostname)
NEW
54
    const user = dsConfig.user || 'postgres';
×
NEW
55
    const password = dsConfig.password || '';
×
NEW
56
    const host = dsConfig.host || dsConfig.url || 'localhost';
×
NEW
57
    const port = dsConfig.port || 5432;
×
NEW
58
    const database = dsConfig.database || 'postgres';
×
NEW
59
    const ssl = dsConfig.ssl ? '?sslmode=require' : '';
×
NEW
60
    return `postgres://${user}:${encodeURIComponent(password)}@${host}:${port}/${database}${ssl}`;
×
61
  },
62

63
  /**
64
   * Initialize pg-boss and register the worker.
65
   * Called from bootstrap.js.
66
   */
67
  async start() {
NEW
68
    const connectionString = module.exports.getConnectionString();
×
NEW
69
    sails.enrichmentBoss = new PgBoss({
×
70
      connectionString,
71
      retryLimit: 5,
72
      retryDelay: 30,
73
      retryBackoff: true,
74
      expireInHours: 24,
75
    });
76

NEW
77
    sails.enrichmentBoss.on('error', (err) => {
×
NEW
78
      sails.log.error(
×
79
        'EnrichmentQueueService pg-boss error:',
80
        err.message || err
×
81
      );
82
    });
83

NEW
84
    await sails.enrichmentBoss.start();
×
NEW
85
    await sails.enrichmentBoss.createQueue(QUEUE_NAME);
×
86

87
    // NOTE: Single-instance constraint — Nominatim rate limit compliance
88
    // (1 req/s) relies on having a single worker process. The poll interval
89
    // plus response latency naturally throttles to ~1 req/s. If horizontal
90
    // scaling is introduced (multiple API instances), an explicit per-job
91
    // delay or a dedicated worker process must be added.
NEW
92
    await sails.enrichmentBoss.work(
×
93
      QUEUE_NAME,
94
      { newJobCheckInterval: 5000 },
95
      module.exports.processJob
96
    );
97

NEW
98
    sails.log.info('EnrichmentQueueService: started and listening for jobs');
×
99
  },
100

101
  /**
102
   * Gracefully stop pg-boss.
103
   */
104
  async stop() {
NEW
105
    if (sails.enrichmentBoss) {
×
NEW
106
      await sails.enrichmentBoss.stop({ graceful: true, timeout: 30000 });
×
NEW
107
      sails.log.info('EnrichmentQueueService: stopped gracefully');
×
108
    }
109
  },
110

111
  /**
112
   * Enqueue an enrichment job.
113
   *
114
   * @param {number} entityId - The ID of the entrance or organization
115
   * @param {'entrance'|'organization'} entityType
116
   * @param {string} [traceId] - Optional trace ID from the originating request
117
   */
118
  async enqueue(entityId, entityType, traceId) {
119
    if (!sails.enrichmentBoss) {
12✔
120
      sails.log.warn(
11✔
121
        'EnrichmentQueueService: queue not initialized, skipping enqueue'
122
      );
123
      return;
11✔
124
    }
125
    await sails.enrichmentBoss.send(
1✔
126
      QUEUE_NAME,
127
      { entityId, entityType, traceId },
128
      { singletonKey: `${entityType}-${entityId}` }
129
    );
130
    sails.log.info(`Enrichment job enqueued: ${entityType} ${entityId}`);
1✔
131
  },
132

133
  /**
134
   * Process enrichment jobs (pg-boss v12 passes an array of jobs).
135
   * @param {object[]} jobs - array of pg-boss job objects
136
   */
137
  async processJob(jobs) {
138
    const jobList = Array.isArray(jobs) ? jobs : [jobs];
3!
139
    for (const job of jobList) {
3✔
140
      if (!job.data) {
3!
NEW
141
        sails.log.warn(
×
142
          `EnrichmentQueueService: job ${job.id} has no data, skipping`
143
        );
144
        // eslint-disable-next-line no-continue
NEW
145
        continue;
×
146
      }
147
      const { entityId, entityType, traceId } = job.data;
3✔
148

149
      const tid = traceId || 'enrichment';
3✔
150
      // eslint-disable-next-line no-await-in-loop
151
      await logger.run(tid, async () => {
3✔
152
        const startTime = Date.now();
3✔
153
        try {
3✔
154
          if (entityType === 'entrance') {
3!
155
            await module.exports.processEntrance(entityId);
3✔
NEW
156
          } else if (entityType === 'organization') {
×
NEW
157
            await module.exports.processOrganization(entityId);
×
158
          } else {
NEW
159
            sails.log.warn(
×
160
              `EnrichmentQueueService: unknown entityType "${entityType}" for job ${job.id}`
161
            );
162
          }
163
        } catch (err) {
164
          if (
3✔
165
            err.statusCode === 429 ||
5✔
166
            TRANSIENT_ERROR_CODES.includes(err.code)
167
          ) {
168
            throw err;
2✔
169
          }
170
          sails.log.error(
1✔
171
            `EnrichmentQueueService: permanent failure for ${entityType} ${entityId} (${Date.now() - startTime}ms):`,
172
            err
173
          );
174
          return;
1✔
175
        }
NEW
176
        sails.log.info(
×
177
          `Enrichment job done: ${entityType} ${entityId} ${Date.now() - startTime}ms`
178
        );
179
      });
180
    }
181
  },
182

183
  /**
184
   * Process an entrance enrichment job.
185
   * @param {number} entranceId
186
   */
187
  async processEntrance(entranceId) {
188
    sails.log.info(`Enrichment job started: entrance ${entranceId}`);
7✔
189
    const entrance = await TEntrance.findOne({ id: entranceId });
7✔
190
    if (!entrance || entrance.isDeleted) {
4✔
191
      sails.log.info(
2✔
192
        `Enrichment job skipped: entrance ${entranceId} (not found or deleted)`
193
      );
194
      return;
2✔
195
    }
196

197
    const address = await sails.services.geocodingservice.reverse(
2✔
198
      entrance.latitude,
199
      entrance.longitude
200
    );
201
    if (!address) {
2✔
202
      sails.log.info(
1✔
203
        `Enrichment job completed: entrance ${entranceId} (no address from Nominatim)`
204
      );
205
      return;
1✔
206
    }
207

208
    await TEntrance.updateOne({ id: entranceId }).set({
1✔
209
      region: address.region,
210
      county: address.county,
211
      city: address.city,
212
      iso_3166_2: address.iso_3166_2,
213
    });
214
    sails.log.info(
1✔
215
      `Enrichment job completed: entrance ${entranceId} (region=${address.region}, iso_3166_2=${address.iso_3166_2})`
216
    );
217
  },
218

219
  /**
220
   * Process an organization enrichment job.
221
   * @param {number} organizationId
222
   */
223
  async processOrganization(organizationId) {
224
    sails.log.info(`Enrichment job started: organization ${organizationId}`);
2✔
225
    const org = await TGrotto.findOne({ id: organizationId });
2✔
226
    if (!org || org.isDeleted) {
2!
NEW
227
      sails.log.info(
×
228
        `Enrichment job skipped: organization ${organizationId} (not found or deleted)`
229
      );
NEW
230
      return;
×
231
    }
232

233
    if (!org.latitude || !org.longitude) {
2✔
234
      sails.log.info(
1✔
235
        `Enrichment job skipped: organization ${organizationId} (no coordinates)`
236
      );
237
      return;
1✔
238
    }
239

240
    const address = await sails.services.geocodingservice.reverse(
1✔
241
      org.latitude,
242
      org.longitude
243
    );
244
    if (!address) {
1!
NEW
245
      sails.log.info(
×
246
        `Enrichment job completed: organization ${organizationId} (no address from Nominatim)`
247
      );
NEW
248
      return;
×
249
    }
250

251
    await TGrotto.updateOne({ id: organizationId }).set({
1✔
252
      iso_3166_2: address.iso_3166_2,
253
    });
254
    sails.log.info(
1✔
255
      `Enrichment job completed: organization ${organizationId} (iso_3166_2=${address.iso_3166_2})`
256
    );
257
  },
258
};
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc