• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

casbin / pycasbin / 5541769856

pending completion
5541769856

Pull #306

github

web-flow
Merge 9f559163f into bacc02b4c
Pull Request #306: feat: update logger for more precise log level and format control

14 of 27 new or added lines in 6 files covered. (51.85%)

2061 of 2414 relevant lines covered (85.38%)

7.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.85
/casbin/core_enforcer.py
1
# Copyright 2021 The casbin Authors. All Rights Reserved.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#      http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import logging
9✔
16
import copy
9✔
17

18
from casbin.effect import Effector, get_effector, effect_to_bool
9✔
19
from casbin.model import Model, FunctionMap
9✔
20
from casbin.persist import Adapter
9✔
21
from casbin.persist.adapters import FileAdapter
9✔
22
from casbin.rbac import default_role_manager
9✔
23
from casbin.util import generate_g_function, SimpleEval, util
9✔
24
from casbin.util.log import configure_logging
9✔
25

26

27
class EnforceContext:
9✔
28
    """
29
    EnforceContext is used as the first element of the parameter "rvals" in method "enforce"
30
    """
31

32
    def __init__(self, rtype: str, ptype: str, etype: str, mtype: str):
9✔
33
        self.rtype: str = rtype
9✔
34
        self.ptype: str = ptype
9✔
35
        self.etype: str = etype
9✔
36
        self.mtype: str = mtype
9✔
37

38

39
class CoreEnforcer:
9✔
40
    """CoreEnforcer defines the core functionality of an enforcer."""
41

42
    model_path = ""
9✔
43
    model = None
9✔
44
    fm = None
9✔
45
    eft = None
9✔
46

47
    adapter = None
9✔
48
    watcher = None
9✔
49
    rm_map = None
9✔
50

51
    enabled = False
9✔
52
    auto_save = False
9✔
53
    auto_build_role_links = False
9✔
54
    auto_notify_watcher = False
9✔
55

56
    def __init__(self, model=None, adapter=None, enable_log=False):
9✔
57
        self.logger = logging.getLogger("casbin.enforcer")
9✔
58
        if isinstance(model, str):
9✔
59
            if isinstance(adapter, str):
9✔
60
                self.init_with_file(model, adapter)
9✔
61
            else:
62
                self.init_with_adapter(model, adapter)
9✔
63
                pass
9✔
64
        else:
65
            if isinstance(adapter, str):
9✔
66
                raise RuntimeError("Invalid parameters for enforcer.")
×
67
            else:
68
                self.init_with_model_and_adapter(model, adapter)
9✔
69

70
        if enable_log:
9✔
NEW
71
            configure_logging()
×
72

73
    def init_with_file(self, model_path, policy_path):
9✔
74
        """initializes an enforcer with a model file and a policy file."""
75
        a = FileAdapter(policy_path)
9✔
76
        self.init_with_adapter(model_path, a)
9✔
77

78
    def init_with_adapter(self, model_path, adapter=None):
9✔
79
        """initializes an enforcer with a database adapter."""
80
        m = self.new_model(model_path)
9✔
81
        self.init_with_model_and_adapter(m, adapter)
9✔
82

83
        self.model_path = model_path
9✔
84

85
    def init_with_model_and_adapter(self, m, adapter=None):
9✔
86
        """initializes an enforcer with a model and a database adapter."""
87

88
        if not isinstance(m, Model) or adapter is not None and not isinstance(adapter, Adapter):
9✔
89
            raise RuntimeError("Invalid parameters for enforcer.")
×
90

91
        self.adapter = adapter
9✔
92

93
        self.model = m
9✔
94
        self.model.print_model()
9✔
95
        self.fm = FunctionMap.load_function_map()
9✔
96

97
        self._initialize()
9✔
98

99
        # Do not initialize the full policy when using a filtered adapter
100
        if self.adapter and not self.is_filtered():
9✔
101
            self.load_policy()
9✔
102

103
    def _initialize(self):
9✔
104
        self.rm_map = dict()
9✔
105
        self.eft = get_effector(self.model["e"]["e"].value)
9✔
106
        self.watcher = None
9✔
107

108
        self.enabled = True
9✔
109
        self.auto_save = True
9✔
110
        self.auto_build_role_links = True
9✔
111
        self.auto_notify_watcher = True
9✔
112

113
        self.init_rm_map()
9✔
114

115
    @staticmethod
9✔
116
    def new_model(path="", text=""):
9✔
117
        """creates a model."""
118

119
        m = Model()
9✔
120
        if len(path) > 0:
9✔
121
            m.load_model(path)
9✔
122
        else:
123
            m.load_model_from_text(text)
9✔
124

125
        return m
9✔
126

127
    def load_model(self):
9✔
128
        """reloads the model from the model CONF file.
129
        Because the policy is attached to a model, so the policy is invalidated and needs to be reloaded by calling LoadPolicy().
130
        """
131

132
        self.model = self.new_model()
9✔
133
        self.model.load_model(self.model_path)
9✔
134
        self.model.print_model()
9✔
135
        self.fm = FunctionMap.load_function_map()
9✔
136

137
    def get_model(self):
9✔
138
        """gets the current model."""
139

140
        return self.model
9✔
141

142
    def set_model(self, m):
9✔
143
        """sets the current model."""
144

145
        self.model = m
9✔
146
        self.fm = FunctionMap.load_function_map()
9✔
147

148
    def get_adapter(self):
9✔
149
        """gets the current adapter."""
150

151
        return self.adapter
9✔
152

153
    def set_adapter(self, adapter):
9✔
154
        """sets the current adapter."""
155

156
        self.adapter = adapter
×
157

158
    def set_watcher(self, watcher):
9✔
159
        """sets the current watcher."""
160

161
        self.watcher = watcher
9✔
162
        pass
9✔
163

164
    def get_role_manager(self):
9✔
165
        """gets the current role manager."""
166
        return self.rm_map["g"]
9✔
167

168
    def get_named_role_manager(self, ptype):
9✔
169
        if ptype in self.rm_map.keys():
×
170
            return self.rm_map.get(ptype)
×
171
        raise ValueError("ptype not found")
×
172

173
    def set_role_manager(self, rm):
9✔
174
        """sets the current role manager."""
175
        self.rm_map["g"] = rm
×
176

177
    def set_named_role_manager(self, ptype, rm):
9✔
178
        self.rm_map[ptype] = rm
×
179

180
    def set_effector(self, eft):
9✔
181
        """sets the current effector."""
182
        self.eft = eft
×
183

184
    def clear_policy(self):
9✔
185
        """clears all policy."""
186

187
        self.model.clear_policy()
×
188

189
    def init_rm_map(self):
9✔
190
        if "g" in self.model.keys():
9✔
191
            for ptype in self.model["g"]:
9✔
192
                assertion = self.model["g"][ptype]
9✔
193
                if assertion.value.count("_") == 2:
9✔
194
                    self.rm_map[ptype] = default_role_manager.RoleManager(10)
9✔
195
                else:
196
                    self.rm_map[ptype] = default_role_manager.DomainManager(10)
9✔
197

198
    def load_policy(self):
9✔
199
        """reloads the policy from file/database."""
200
        need_to_rebuild = False
9✔
201
        new_model = copy.deepcopy(self.model)
9✔
202
        new_model.clear_policy()
9✔
203

204
        try:
9✔
205

206
            self.adapter.load_policy(new_model)
9✔
207

208
            new_model.sort_policies_by_subject_hierarchy()
9✔
209

210
            new_model.sort_policies_by_priority()
9✔
211

212
            new_model.print_policy()
9✔
213

214
            if self.auto_build_role_links:
9✔
215

216
                need_to_rebuild = True
9✔
217
                for rm in self.rm_map.values():
9✔
218
                    rm.clear()
9✔
219

220
                new_model.build_role_links(self.rm_map)
9✔
221

222
            self.model = new_model
9✔
223

224
        except Exception as e:
×
225

226
            if self.auto_build_role_links and need_to_rebuild:
×
227
                self.build_role_links()
×
228

229
            raise e
×
230

231
    def load_filtered_policy(self, filter):
9✔
232
        """reloads a filtered policy from file/database."""
233
        self.model.clear_policy()
9✔
234

235
        if not hasattr(self.adapter, "is_filtered"):
9✔
236
            raise ValueError("filtered policies are not supported by this adapter")
9✔
237

238
        self.adapter.load_filtered_policy(self.model, filter)
9✔
239

240
        self.model.sort_policies_by_priority()
9✔
241

242
        self.init_rm_map()
9✔
243
        self.model.print_policy()
9✔
244
        if self.auto_build_role_links:
9✔
245
            self.build_role_links()
9✔
246

247
    def load_increment_filtered_policy(self, filter):
9✔
248
        """LoadIncrementalFilteredPolicy append a filtered policy from file/database."""
249
        if not hasattr(self.adapter, "is_filtered"):
9✔
250
            raise ValueError("filtered policies are not supported by this adapter")
×
251

252
        self.adapter.load_filtered_policy(self.model, filter)
9✔
253
        self.model.print_policy()
9✔
254
        if self.auto_build_role_links:
9✔
255
            self.build_role_links()
9✔
256

257
    def is_filtered(self):
9✔
258
        """returns true if the loaded policy has been filtered."""
259

260
        return hasattr(self.adapter, "is_filtered") and self.adapter.is_filtered()
9✔
261

262
    def save_policy(self):
9✔
263
        if self.is_filtered():
9✔
264
            raise RuntimeError("cannot save a filtered policy")
9✔
265

266
        self.adapter.save_policy(self.model)
9✔
267

268
        if self.watcher:
9✔
269
            if callable(getattr(self.watcher, "update_for_save_policy", None)):
9✔
270
                self.watcher.update_for_save_policy(self.model)
9✔
271
            else:
272
                self.watcher.update()
×
273

274
    def enable_enforce(self, enabled=True):
9✔
275
        """changes the enforcing state of Casbin,
276
        when Casbin is disabled, all access will be allowed by the Enforce() function.
277
        """
278

279
        self.enabled = enabled
×
280

281
    def enable_auto_save(self, auto_save):
9✔
282
        """controls whether to save a policy rule automatically to the adapter when it is added or removed."""
283
        self.auto_save = auto_save
×
284

285
    def enable_auto_build_role_links(self, auto_build_role_links):
9✔
286
        """controls whether to rebuild the role inheritance relations when a role is added or deleted."""
287
        self.auto_build_role_links = auto_build_role_links
×
288

289
    def enable_auto_notify_watcher(self, auto_notify_watcher):
9✔
290
        """controls whether to save a policy rule automatically notify the watcher when it is added or removed."""
291
        self.auto_notify_watcher = auto_notify_watcher
9✔
292

293
    def build_role_links(self):
9✔
294
        """manually rebuild the role inheritance relations."""
295

296
        for rm in self.rm_map.values():
9✔
297
            rm.clear()
9✔
298

299
        self.model.build_role_links(self.rm_map)
9✔
300

301
    def add_named_matching_func(self, ptype, fn):
9✔
302
        """add_named_matching_func add MatchingFunc by ptype RoleManager"""
303
        try:
9✔
304
            self.rm_map[ptype].add_matching_func(fn)
9✔
305
            return True
9✔
306
        except:
×
307
            return False
×
308

309
    def add_named_domain_matching_func(self, ptype, fn):
9✔
310
        """add_named_domain_matching_func add MatchingFunc by ptype to RoleManager"""
311
        if ptype in self.rm_map.keys():
×
312
            self.rm_map[ptype].add_domain_matching_func(fn)
×
313
            return True
×
314

315
        return False
×
316

317
    def new_enforce_context(self, suffix: str) -> EnforceContext:
9✔
318

319
        return EnforceContext(
9✔
320
            rtype="r" + suffix,
321
            ptype="p" + suffix,
322
            etype="e" + suffix,
323
            mtype="m" + suffix,
324
        )
325

326
    def enforce(self, *rvals):
9✔
327
        """decides whether a "subject" can access a "object" with the operation "action",
328
        input parameters are usually: (sub, obj, act).
329
        """
330
        result, _ = self.enforce_ex(*rvals)
9✔
331
        return result
9✔
332

333
    def enforce_ex(self, *rvals):
9✔
334
        """decides whether a "subject" can access a "object" with the operation "action",
335
        input parameters are usually: (sub, obj, act).
336
        return judge result with reason
337
        """
338

339
        rtype = "r"
9✔
340
        ptype = "p"
9✔
341
        etype = "e"
9✔
342
        mtype = "m"
9✔
343

344
        if not self.enabled:
9✔
345
            return [True, []]
×
346

347
        functions = self.fm.get_functions()
9✔
348

349
        if "g" in self.model.keys():
9✔
350
            for key, ast in self.model["g"].items():
9✔
351
                rm = ast.rm
9✔
352
                functions[key] = generate_g_function(rm)
9✔
353

354
        if len(rvals) != 0:
9✔
355
            if isinstance(rvals[0], EnforceContext):
9✔
356
                enforce_context = rvals[0]
9✔
357
                rtype = enforce_context.rtype
9✔
358
                ptype = enforce_context.ptype
9✔
359
                etype = enforce_context.etype
9✔
360
                mtype = enforce_context.mtype
9✔
361
                rvals = rvals[1:]
9✔
362

363
        if "m" not in self.model.keys():
9✔
364
            raise RuntimeError("model is undefined")
×
365

366
        if "m" not in self.model["m"].keys():
9✔
367
            raise RuntimeError("model is undefined")
×
368

369
        r_tokens = self.model["r"][rtype].tokens
9✔
370
        p_tokens = self.model["p"][ptype].tokens
9✔
371

372
        if len(r_tokens) != len(rvals):
9✔
373
            raise RuntimeError("invalid request size")
×
374

375
        exp_string = self.model["m"][mtype].value
9✔
376
        exp_has_eval = util.has_eval(exp_string)
9✔
377
        if not exp_has_eval:
9✔
378
            expression = self._get_expression(exp_string, functions)
9✔
379

380
        policy_effects = set()
9✔
381

382
        r_parameters = dict(zip(r_tokens, rvals))
9✔
383

384
        policy_len = len(self.model["p"][ptype].policy)
9✔
385

386
        explain_index = -1
9✔
387
        if not 0 == policy_len:
9✔
388
            for i, pvals in enumerate(self.model["p"][ptype].policy):
9✔
389
                if len(p_tokens) != len(pvals):
9✔
390
                    raise RuntimeError("invalid policy size")
×
391

392
                p_parameters = dict(zip(p_tokens, pvals))
9✔
393
                parameters = dict(r_parameters, **p_parameters)
9✔
394

395
                if exp_has_eval:
9✔
396
                    rule_names = util.get_eval_value(exp_string)
9✔
397
                    rules = [util.escape_assertion(p_parameters[rule_name]) for rule_name in rule_names]
9✔
398
                    exp_with_rule = util.replace_eval(exp_string, rules)
9✔
399
                    expression = self._get_expression(exp_with_rule, functions)
9✔
400

401
                result = expression.eval(parameters)
9✔
402

403
                if isinstance(result, bool):
9✔
404
                    if not result:
9✔
405
                        policy_effects.add(Effector.INDETERMINATE)
9✔
406
                        continue
9✔
407
                elif isinstance(result, float):
×
408
                    if 0 == result:
×
409
                        policy_effects.add(Effector.INDETERMINATE)
×
410
                        continue
×
411
                else:
412
                    raise RuntimeError("matcher result should be bool, int or float")
×
413

414
                p_eft_key = ptype + "_eft"
9✔
415
                if p_eft_key in parameters.keys():
9✔
416
                    eft = parameters[p_eft_key]
9✔
417
                    if "allow" == eft:
9✔
418
                        policy_effects.add(Effector.ALLOW)
9✔
419
                    elif "deny" == eft:
9✔
420
                        policy_effects.add(Effector.DENY)
9✔
421
                    else:
422
                        policy_effects.add(Effector.INDETERMINATE)
9✔
423
                else:
424
                    policy_effects.add(Effector.ALLOW)
9✔
425

426
                if self.eft.intermediate_effect(policy_effects) != Effector.INDETERMINATE:
9✔
427
                    explain_index = i
9✔
428
                    break
9✔
429

430
        else:
431
            if exp_has_eval:
9✔
432
                raise RuntimeError("please make sure rule exists in policy when using eval() in matcher")
×
433

434
            parameters = r_parameters.copy()
9✔
435

436
            for token in self.model["p"][ptype].tokens:
9✔
437
                parameters[token] = ""
9✔
438

439
            result = expression.eval(parameters)
9✔
440

441
            if result:
9✔
442
                policy_effects.add(Effector.ALLOW)
9✔
443
            else:
444
                policy_effects.add(Effector.INDETERMINATE)
9✔
445

446
        final_effect = self.eft.final_effect(policy_effects)
9✔
447
        result = effect_to_bool(final_effect)
9✔
448

449
        # Log request.
450

451
        req_str = "Request: "
9✔
452
        req_str = req_str + ", ".join([str(v) for v in rvals])
9✔
453

454
        req_str = req_str + " ---> %s" % result
9✔
455
        if result:
9✔
456
            self.logger.info(req_str)
9✔
457
        else:
458
            # leaving this in error for now, if it's very noise this can be changed to info or debug,
459
            # or change the log level
460
            self.logger.error(req_str)
9✔
461

462
        explain_rule = []
9✔
463
        if explain_index != -1 and explain_index < policy_len:
9✔
464
            explain_rule = self.model["p"][ptype].policy[explain_index]
9✔
465

466
        return result, explain_rule
9✔
467

468
    def batch_enforce(self, rvals):
9✔
469
        """batch_enforce enforce in batches"""
470
        results = []
9✔
471
        for request in rvals:
9✔
472
            result = self.enforce(*request)
9✔
473
            results.append(result)
9✔
474
        return results
9✔
475

476
    @staticmethod
9✔
477
    def configure_logging(logging_config=None):
9✔
478
        """configure_logging configure the default logger for casbin"""
NEW
479
        configure_logging(logging_config)
×
480

481
    @staticmethod
9✔
482
    def _get_expression(expr, functions=None):
9✔
483
        expr = expr.replace("&&", "and")
9✔
484
        expr = expr.replace("||", "or")
9✔
485
        expr = expr.replace("!", "not")
9✔
486

487
        return SimpleEval(expr, functions)
9✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc