• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

restorecommerce / access-control-srv / 9698111598

27 Jun 2024 02:07PM UTC coverage: 74.798% (-1.8%) from 76.633%
9698111598

push

github

Arun-KumarH
fix: Up cfg for importing seed data on startup and updated deps

499 of 669 branches covered (74.59%)

Branch coverage included in aggregate %.

2644 of 3533 relevant lines covered (74.84%)

55.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.39
/src/worker.ts
1
import _ from 'lodash-es';
1✔
2
import * as chassis from '@restorecommerce/chassis-srv';
1✔
3
import { createLogger } from '@restorecommerce/logger';
1✔
4
import { Logger } from 'winston';
1✔
5
import { Events, registerProtoMeta } from '@restorecommerce/kafka-client';
1✔
6
import { AccessControlCommandInterface, AccessControlService } from './accessControlService.js';
1✔
7
import { ResourceManager } from './resourceManager.js';
1✔
8
import { createClient, RedisClientType } from 'redis';
1✔
9
import { Arango } from '@restorecommerce/chassis-srv/lib/database/provider/arango/base.js';
1✔
10
import { AccessController } from './core/accessController.js';
1✔
11
import { ACSAuthZ, initAuthZ, initializeCache } from '@restorecommerce/acs-client';
1✔
12
import { createChannel, createClient as grpcCreateClient } from '@restorecommerce/grpc-client';
1✔
13
import {
1✔
14
  FindByTokenRequest, UserServiceClient, UserServiceDefinition
1✔
15
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/user.js';
1✔
16
import {
1✔
17
  RoleAssociation
1✔
18
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/auth.js';
1✔
19
import {
1✔
20
  RuleServiceDefinition,
1✔
21
  protoMetadata as ruleMeta
1✔
22
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/rule.js';
1✔
23
import {
1✔
24
  PolicyServiceDefinition,
1✔
25
  protoMetadata as policyMeta
1✔
26
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/policy.js';
1✔
27
import {
1✔
28
  PolicySetServiceDefinition,
1✔
29
  protoMetadata as policySetMeta
1✔
30
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/policy_set.js';
1✔
31
import {
1✔
32
  AccessControlServiceDefinition,
1✔
33
  protoMetadata as accessControlMeta
1✔
34
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/access_control.js';
1✔
35
import {
1✔
36
  CommandInterfaceServiceDefinition,
1✔
37
  protoMetadata as commandInterfaceMeta
1✔
38
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/commandinterface.js';
1✔
39
import { protoMetadata as reflectionMeta } from '@restorecommerce/rc-grpc-clients/dist/generated-server/grpc/reflection/v1alpha/reflection.js';
1✔
40
import { protoMetadata as authMeta } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/auth.js';
1✔
41
import {
1✔
42
  protoMetadata as userMeta
1✔
43
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/user.js';
1✔
44
import {
1✔
45
  HealthDefinition
1✔
46
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/grpc/health/v1/health.js';
1✔
47
import { BindConfig } from '@restorecommerce/chassis-srv/lib/microservice/transport/provider/grpc/index.js';
1✔
48
import { compareRoleAssociations, flushACSCache } from './core/utils.js';
1✔
49
import * as fs from 'node:fs';
1✔
50
import yaml from 'js-yaml';
1✔
51

1✔
52
const capitalized = (collectionName: string): string => {
1✔
53
  const labels = collectionName.split('_').map((element) => {
45✔
54
    return element.charAt(0).toUpperCase() + element.substr(1);
60✔
55
  });
45✔
56
  return _.join(labels, '');
45✔
57
};
45✔
58

1✔
59
registerProtoMeta(ruleMeta, policyMeta, policySetMeta, accessControlMeta,
1✔
60
  commandInterfaceMeta, reflectionMeta, authMeta, userMeta);
1✔
61

1✔
62
/**
1✔
63
 * Generates Kafka configs for CRUD events.
1✔
64
 */
1✔
65
const genEventsConfig = (collectionName: string, cfg: any): any => {
1✔
66
  const servicePrefix = cfg.get('protosServicePrefix');
15✔
67

15✔
68
  const crudEvents = ['Created', 'Modified', 'Deleted'];
15✔
69

15✔
70
  const kafkaCfg = cfg.get('events:kafka');
15✔
71
  for (let event of crudEvents) {
15✔
72
    kafkaCfg[`${collectionName}${event}`] = {
45✔
73
      messageObject: `${servicePrefix}${collectionName}.${capitalized(collectionName)}`
45✔
74
    };
45✔
75
  }
45✔
76
  return kafkaCfg;
15✔
77
};
15✔
78

1✔
79
/**
1✔
80
 * Access Control Service
1✔
81
 */
1✔
82
export class Worker {
1✔
83
  cfg: any;
5✔
84
  logger: Logger;
5✔
85
  server: chassis.Server;
5✔
86
  events: Events;
5✔
87
  commandInterface: AccessControlCommandInterface;
5✔
88
  accessController: AccessController;
5✔
89
  redisClient: RedisClientType<any, any>;
5✔
90
  authZ: ACSAuthZ;
5✔
91
  offsetStore: chassis.OffsetStore;
5✔
92
  async start(cfg?: any, logger?: any): Promise<any> {
5✔
93
    this.cfg = cfg || await chassis.config.get();
5!
94
    const loggerCfg = this.cfg.get('logger');
×
95
    this.logger = logger || createLogger(loggerCfg);
5✔
96

5✔
97
    this.logger.info('Starting access control service');
5✔
98
    const server = new chassis.Server(this.cfg.get('server'), this.logger);  // gRPC server
5✔
99

5✔
100
    let kafkaConfig = this.cfg.get('events:kafka');
5✔
101
    const policySetConfig = genEventsConfig('policy_set', this.cfg);
5✔
102
    const policyConfig = genEventsConfig('policy', this.cfg);
5✔
103
    let ruleConfig = genEventsConfig('rule', this.cfg);
5✔
104

5✔
105
    this.cfg.set('events:kafka',
5✔
106
      _.assign({}, kafkaConfig, policySetConfig, policyConfig, ruleConfig));
5✔
107

5✔
108
    kafkaConfig = this.cfg.get('events:kafka');
5✔
109
    const acsEvents = [
5✔
110
      'policy_setCreated',
5✔
111
      'policy_setModified',
5✔
112
      'policy_setDeleted',
5✔
113
      'policyCreated',
5✔
114
      'policyModified',
5✔
115
      'policyDeleted',
5✔
116
      'ruleCreated',
5✔
117
      'ruleModified',
5✔
118
      'ruleDeleted',
5✔
119
    ];
5✔
120
    const hierarchicalScopesResponse = 'hierarchicalScopesResponse';
5✔
121
    const events = new Events(kafkaConfig, this.logger); // Kafka
5✔
122
    await events.start();
5✔
123
    this.offsetStore = new chassis.OffsetStore(events, this.cfg, this.logger);
5✔
124

5✔
125
    // init Redis Client for subject index
5✔
126
    const redisConfig = this.cfg.get('redis');
5✔
127
    redisConfig.database = this.cfg.get('redis:db-indexes:db-subject');
5✔
128
    this.redisClient = createClient(redisConfig);
5✔
129
    this.redisClient.on('error', (err) => logger.error('Redis Client Error', { code: err.code, message: err.message, stack: err.stack }));
5✔
130
    await this.redisClient.connect();
5✔
131

5✔
132
    const userTopic = await events.topic(kafkaConfig.topics['user'].topic);
5✔
133
    // instantiate IDS client
5✔
134
    let userService: UserServiceClient;
5✔
135
    const grpcIDSConfig = this.cfg.get('client:user');
5✔
136
    if (grpcIDSConfig) {
5✔
137
      const channel = createChannel(grpcIDSConfig.address);
5✔
138
      userService = grpcCreateClient({
5✔
139
        ...grpcIDSConfig,
5✔
140
        logger: this.logger
5✔
141
      }, UserServiceDefinition, channel);
5✔
142
    }
5✔
143
    this.accessController = new AccessController(this.logger,
5✔
144
      this.cfg.get('policies:options'), userTopic, this.cfg, userService);
5✔
145

5✔
146
    // resources
5✔
147
    const db = await chassis.database.get(this.cfg.get('database:main'), this.logger);
5✔
148
    // init ACS cache
5✔
149
    await initializeCache();
5✔
150
    // init AuthZ
5✔
151
    this.authZ = await initAuthZ(this.cfg) as ACSAuthZ;
5✔
152
    const resourceManager = new ResourceManager(this.cfg, this.logger, events, db,
5✔
153
      this.accessController, this.redisClient, this.authZ);
5✔
154
    await resourceManager.setup();
5✔
155
    await server.bind('io-restorecommerce-policy-set-srv', {
5✔
156
      service: PolicySetServiceDefinition,
5✔
157
      implementation: resourceManager.getResourceService('policy_set')
5✔
158
    } as BindConfig<PolicySetServiceDefinition>);
5✔
159
    // policy resource
5✔
160
    await server.bind('io-restorecommerce-policy-srv', {
5✔
161
      service: PolicyServiceDefinition,
5✔
162
      implementation: resourceManager.getResourceService('policy')
5✔
163
    } as BindConfig<PolicyServiceDefinition>);
5✔
164
    // policy resource
5✔
165
    await server.bind('io-restorecommerce-rule-srv', {
5✔
166
      service: RuleServiceDefinition,
5✔
167
      implementation: resourceManager.getResourceService('rule')
5✔
168
    } as BindConfig<RuleServiceDefinition>);
5✔
169
    // access control service
5✔
170
    const accessControlService = new AccessControlService(this.cfg, this.logger, resourceManager, this.accessController);
5✔
171
    await server.bind('io-restorecommerce-access-control-srv', {
5✔
172
      service: AccessControlServiceDefinition,
5✔
173
      implementation: accessControlService
5✔
174
    } as BindConfig<AccessControlServiceDefinition>);
5✔
175
    // command interface
5✔
176
    this.commandInterface = new AccessControlCommandInterface(server, this.cfg,
5✔
177
      this.logger, events, accessControlService, this.redisClient);
5✔
178
    await server.bind('io-restorecommerce-access-control-ci', {
5✔
179
      service: CommandInterfaceServiceDefinition,
5✔
180
      implementation: this.commandInterface
5✔
181
    } as BindConfig<CommandInterfaceServiceDefinition>);
5✔
182

5✔
183
    await server.bind('grpc-health-v1', {
5✔
184
      service: HealthDefinition,
5✔
185
      implementation: new chassis.Health(this.commandInterface, {
5✔
186
        readiness: async () => !!await ((db as Arango).db).version()
5✔
187
      })
5✔
188
    } as BindConfig<HealthDefinition>);
5✔
189

5✔
190
    this.events = events;
5✔
191
    this.server = server;
5✔
192
    await server.start();
5✔
193

5✔
194
    // load seed policy_sets, policies and rules if it exists
5✔
195
    const seedDataConfig = this.cfg.get('seed_data');
5✔
196
    if (seedDataConfig) {
5!
197
      const entities = Object.keys(seedDataConfig);
×
198
      for (let entity of entities) {
×
199
        const filePath = seedDataConfig[entity];
×
200
        await new Promise<void>((resolve, reject) => {
×
201
          fs.readFile(filePath, (err, data) => {
×
202
            if (err) {
×
203
              this.logger.error(`Failed loading seed ${entity} file`, err);
×
204
              reject(err);
×
205
              return;
×
206
            }
×
207

×
208
            let seedData;
×
209
            try {
×
210
              seedData = yaml.load(data, 'utf8');
×
211
            } catch (err) {
×
212
              this.logger.error(`Error parsing seed ${entity} file`, err);
×
213
              reject(err);
×
214
              return;
×
215
            }
×
216
            this.logger.info(`Loaded ${seedData?.length} seed ${entity}`);
×
217

×
218
            // get respective service object for upserting resource
×
219
            const service = resourceManager.getResourceService(entity);
×
220

×
221
            service.superUpsert({ items: seedData }, undefined)
×
222
              .then(() => {
×
223
                this.logger.info(`Seed ${entity} upserted successfully`);
×
224
                resolve();
×
225
              })
×
226
              .catch(err => {
×
227
                this.logger.error(`Failed upserting seed ${entity} file`, err);
×
228
                reject(err);
×
229
              });
×
230
          });
×
231
        }).catch((err) => {
×
232
          this.logger.error(`Failed upserting seed ${entity} file`, err);
×
233
        });;
×
234
      }
×
235
    }
×
236

5✔
237
    this.logger.info('Access control service started correctly!');
5✔
238
    await accessControlService.loadPolicies();
5✔
239

5✔
240
    const that = this;
5✔
241
    const commandTopic = await events.topic(this.cfg.get('events:kafka:topics:command:topic'));
5✔
242
    const eventListener = async (msg: any,
5✔
243
      context: any, config: any, eventName: string): Promise<any> => {
2✔
244
      if (acsEvents.indexOf(eventName) > -1) {
2!
245
        await accessControlService.loadPolicies();
×
246
      } else if (eventName === hierarchicalScopesResponse) {
2✔
247
        // Add subject_id to waiting list
2✔
248
        const hierarchical_scopes = msg?.hierarchical_scopes ? msg.hierarchical_scopes : [];
2!
249
        const tokenDate = msg?.token;
2✔
250
        // store HR scopes to cache with subjectID
2✔
251
        const subID = msg?.subject_id;
2✔
252
        const token = tokenDate?.split(':')[0];
2✔
253
        let redisHRScopesKey;
2✔
254
        let subject;
2✔
255
        if (token) {
2✔
256
          subject = await this.accessController.userService.findByToken(FindByTokenRequest.fromPartial({ token }));
2✔
257
          if (subject?.payload) {
2✔
258
            const tokens = subject.payload.tokens;
2✔
259
            const subID = subject.payload.id;
2✔
260
            const tokenFound = _.find(tokens, { token });
2✔
261
            if (tokenFound?.interactive) {
2!
262
              redisHRScopesKey = `cache:${subID}:hrScopes`;
×
263
            } else if (tokenFound && !tokenFound.interactive) {
2✔
264
              redisHRScopesKey = `cache:${subID}:${token}:hrScopes`;
2✔
265
            }
2✔
266

2✔
267
            let redisSubKey = `cache:${subID}:subject`;
2✔
268
            let redisSub;
2✔
269
            try {
2✔
270
              redisSub = await that.accessController.getRedisKey(redisSubKey);
2✔
271
              if (_.isEmpty(redisSub)) {
2✔
272
                await that.accessController.setRedisKey(redisSubKey, JSON.stringify(subject.payload));
2✔
273
              }
2✔
274
            } catch (err) {
2!
275
              this.logger.error('Error retrieving Subject from redis in acs-srv');
×
276
            }
×
277
          }
2✔
278
        }
2✔
279

2✔
280
        try {
2✔
281
          await that.accessController.setRedisKey(redisHRScopesKey, JSON.stringify(hierarchical_scopes));
2✔
282
          that.logger.info(`HR scope saved successfully for Subject ${subID}`);
2✔
283
        } catch (err) {
2!
284
          that.logger.info('Subject not persisted in redis for updating');
×
285
        }
×
286
        if (that.accessController.waiting[tokenDate]) {
2✔
287
          // clear timeout and resolve
2✔
288
          that.accessController.waiting[tokenDate].forEach(waiter => {
2✔
289
            clearTimeout(waiter.timeoutId);
2✔
290
            return waiter.resolve(true);
2✔
291
          });
2✔
292
          delete that.accessController.waiting[tokenDate];
2✔
293
        }
2✔
294
      } else if (eventName === 'userModified') {
2!
295
        if (msg && 'id' in msg) {
×
296
          const updatedRoleAssocs = msg.role_associations as RoleAssociation[];
×
297
          const updatedTokens = msg.tokens;
×
298
          let redisKey = `cache:${msg.id}:subject`;
×
299
          const redisSubject = await that.accessController.getRedisKey(redisKey);
×
300
          if (redisSubject) {
×
301
            const redisRoleAssocs = redisSubject.role_associations;
×
302
            const redisTokens = redisSubject.tokens;
×
303
            let roleAssocModified = compareRoleAssociations(updatedRoleAssocs, redisRoleAssocs, that.logger);
×
304
            let tokensEqual;
×
305
            // for interactive login after logout we receive userModified event
×
306
            // with empty tokens, so below check is not to evict cache for this case
×
307
            if (_.isEmpty(updatedTokens)) {
×
308
              tokensEqual = true;
×
309
            }
×
310
            for (let token of updatedTokens || []) {
×
311
              if (!token.interactive) {
×
312
                // compare only token scopes (since it now contains last_login as well)
×
313
                for (let redisToken of redisTokens || []) {
×
314
                  if (redisToken.token === token.token) {
×
315
                    tokensEqual = _.isEqual(redisToken?.scopes?.sort(), token?.scopes?.sort());
×
316
                  }
×
317
                }
×
318
                if (!tokensEqual) {
×
319
                  that.logger.debug('Subject Token scope has been updated', token);
×
320
                  break;
×
321
                }
×
322
              } else {
×
323
                tokensEqual = true;
×
324
              }
×
325
            }
×
326
            if (roleAssocModified || !tokensEqual || (updatedRoleAssocs?.length != redisRoleAssocs?.length)) {
×
327
              that.logger.info('Evicting HR scope for Subject', { id: msg.id });
×
328
              await that.accessController.evictHRScopes(msg.id); // flush HR scopes
×
329
              // TODO use tech user below once ACS check is implemented on chassis-srv for command-interface
×
330
              // Flush ACS Cache via flushCache Command
×
331
              await flushACSCache(msg.id, that.cfg.get('authorization:cache:db-index'), commandTopic, that.logger);
×
332
            }
×
333
          }
×
334
        }
×
335
      } else if (eventName === 'userDeleted') {
×
336
        that.logger.info('Evicting HR scope for Subject', { id: msg.id });
×
337
        await that.accessController.evictHRScopes(msg.id); // flush HR scopes
×
338
        // delete ACS cache
×
339
        await flushACSCache(msg.id, that.cfg.get('authorization:cache:db-index'), commandTopic, that.logger);
×
340
      } else {
×
341
        await that.commandInterface.command(msg, context);
×
342
      }
×
343
    };
5✔
344

5✔
345
    for (let topicLabel in kafkaConfig.topics) {
5✔
346
      const topicCfg = kafkaConfig.topics[topicLabel];
25✔
347
      const topic = await events.topic(topicCfg.topic);
25✔
348
      const offSetValue = await this.offsetStore.getOffset(topicCfg.topic);
25✔
349
      that.logger.info('subscribing to topic with offset value', topicCfg.topic, offSetValue);
25✔
350
      if (topicCfg.events) {
25✔
351
        for (let eventName of topicCfg.events) {
10✔
352
          await topic.on(eventName, eventListener, { startingOffset: offSetValue });
25✔
353
        }
25✔
354
      }
10✔
355
    }
25✔
356

5✔
357
    return accessControlService;
5✔
358
  }
5✔
359

5✔
360
  async stop(): Promise<void> {
5✔
361
    await this.events.stop();
5✔
362
    await this.server.stop();
5✔
363
    await this.offsetStore.stop();
5✔
364
    await this.redisClient.quit();
5✔
365
  }
5✔
366
}
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc