Best Python code snippet using tempest_python
app.py
Source:app.py
1# -*- coding: utf-82################## æ»ç» ##################3'''4å
³äºè®¾è®¡æ¶æï¼5* æ件å表ï¼6__init__.py _external.py fixture.py opts.py sphinxext.py version.py7_cache_handler.py _i18n.py generator.py policy.py sphinxpolicygen.py8_checks.py _parser.py locale shell.py tests9* 设计æ¶æï¼10.json --> _parser module --> _checks module --> policy module11json ææ¬æ件 è´è´£å°ææçè¡è½¬åæè§å å®ä¹ä¸ç§kindï¼ååç§Rule å®ç°è§åçç»ç»Rulesï¼å
¥å£å½æ°Enforcer12 ï¼ææ¬å°è§åç转åï¼13'''14'''15å
³äºè§å解æï¼16å
·ä½çè§å解æè¿ç¨åè§ _parser.py17'''18'''19å
³äºä½¿ç¨æ¹æ³ï¼20两ç§æ¹æ³å®ç°è§åçå è½½ã211. æå¨å è½½ï¼èªå·±çæRules对象ï¼ä¼ éç»Enforcer222. 使ç¨Enforcerèªå¨å è½½ï¼åCONFé
å使ç¨ã23'''24'''25å
³äºè°è¯è¿ç¨ï¼26python -m test.py 27æ¾ç»è¢«å®ä½è¿çç¹ï¼28b /Users/zong/PythonEnvs/oslo-env/lib/python2.7/site-packages/oslo_policy/policy.py:79229 792 def enforce(self, rule, target, creds, do_raise=False, exc=None,30 793 *args, **kwargs):31b /Users/zong/PythonEnvs/oslo-env/lib/python2.7/site-packages/oslo_policy/_checks.py:33332 290 @register(None)33 291 class GenericCheck(Check):34 ...35 333 def __call__(self, target, creds, enforcer, current_rule=None):36 33437'''38'''39å
³äºå½ä»¤è¡ï¼401. pip install keystone412. oslopolicy-sample-generator --namespace keystone --format json --output-file policy.json423. oslopolicy-checker --policy policy.json --access token_admin.json --rule identity:get_project_tag --is_admin=True43 $ passed: identity:get_project_tag44'''45'''46å
³äºæ档缺é·ï¼47æè§ææ¡£åçä¸æä¸ç½ã缺å°åçç½ç®ä¹¦ï¼48æçææçoslo_policy 模å æ¯ 49ç»ææä¾ä¸ä¸ªå
¥å£ï¼authorize! 50å´ç»è¿ä¸ä¸ªå
¥å£ï¼å è½½è§åï¼éªè¯è§åï¼51æ建éªè¯è¿ç¨æéè¦çç¨æ·ä¿¡æ¯credsï¼52åºå£ä¸ä¸ªï¼True or False53没æå°æ¹æå°credsçç±»åãèªå·±å»æ¾openstackçå®ç°ææ¡£ã54from /usr/share/openstack-dashboard/openstack_dashboard/policy.py:55 30 """Mixin that adds the get_policy_target function56 3157 32 policy_target_attrs - a tuple of tuples which defines58 33 the relationship between attributes in the policy59 34 target dict and attributes in the passed datum object.60 35 policy_target_attrs can be overwritten by sub-classes61 36 which do not use the default, so they can neatly define62 37 their policy target information, without overriding the63 38 entire get_policy_target function.64 39 """65 4066 41 policy_target_attrs = (("project_id", "tenant_id"),67 42 ("user_id", "user_id"),68 43 ("domain_id", "domain_id"),69 44 ("target.project.domain_id", "domain_id"),70 45 ("target.user.domain_id", "domain_id"),71 46 ("target.group.domain_id", "domain_id"))72 4773 48 def get_policy_target(self, request, datum=None):74 49 policy_target = {}75 50 for policy_attr, datum_attr in self.policy_target_attrs:76 51 if datum:77 52 policy_target[policy_attr] = getattr(datum, datum_attr, None)78 53 else:79 54 policy_target[policy_attr] = None80 55 return policy_target811263 "is invalid. It should be one of %(allowed)s")821264 % {'feature': feature,831265 'allowed': ' '.join(feature_policies.keys())})841266 role = (('network', policy_name),)851267 if not policy.check(role, request):861268 return False87为ä»ä¹ 没æ register_rule(s), èåªæregister_default(s)ï¼å¯¼è´registered_rulesæ æ³éè¿æ¥å£èµå¼ã88为ä»ä¹æ²¡æä¸ä¸ªææ¡£åè¯æ说Enforcer.authorize å°±æ¯æç»å¾å¤æä¾çæ¥å£ã 89'''90################## 以ä¸æ¯ç¤ºä¾ä»£ç ##################91from oslo_policy import policy92from oslo_config import cfg93from oslo_context import context94'''95æ¼ç¤º è§åå¹é
è¿ç¨ï¼96result 为Trueåªæå½ï¼ 971. ctx.roles = ['admin']982. ctx.project_id = 'myproject'993. {'target.project.id': 'myproject'}100å³ï¼è§ååtargetä¼è¢«ç¨æ¥å¹é
creds.101ctx = context.RequestContext()102ctx.roles = ['admin']103ctx.project_id = 'myproject'104creds = ctx.to_policy_values()105data = '{"myrule": "project_id:%(target.project.id)s"}'106rules = policy.Rules.load(data)107enforcer = policy.Enforcer(cfg.CONF)108enforcer.set_rules(rules)109enforcer.registered_rules = rules110print(rules.get("myrule"))111print(creds)112result = enforcer.enforce(rules.get("myrule"), {'target.project.id': 'myproject'}, creds)113print(result)114'''115'''116éªè¯æç»æ£ç¡®ç使ç¨æ¹å¼ï¼enforcer.enforce(rule(not str), ...)117ctx = context.RequestContext()118ctx.roles = ['admin']119creds = ctx.to_policy_values()120with open("policy.json") as fr:121 data = fr.read()122 rules = policy.Rules.load(data)123 enforcer = policy.Enforcer(cfg.CONF)124 enforcer.set_rules(rules)125 enforcer.registered_rules = rules126 print(rules.get("identity:list_project_tags"))127 print(creds)128 129 result = enforcer.enforce(rules.get("identity:list_project_tags"), {'project': {'id': '23432435345345'}}, creds)130 print(result)131'''132'''133æ¼ç¤ºå¦å¤ä¸ç§ä¸å¯ä»¥çè°ç¨æ¹å¼ï¼authorize使ç¨BaseCheck ç±»åè°ç¨ãä»ä¼å 为134 972 if rule not in self.registered_rules:135 973 raise PolicyNotRegistered(rule)136 ç´æ¥æ¥éã137ctx = context.RequestContext()138creds = ctx.to_policy_values()139with open("policy.json") as fr:140 data = fr.read()141 rules = policy.Rules.load(data)142 enforcer = policy.Enforcer(cfg.CONF)143 enforcer.set_rules(rules)144 enforcer.registered_rules = rules145 result = enforcer.authorize(rules.get("identity:list_project_tags"), {}, creds)146 # oslo_policy.policy.PolicyNotRegistered: Policy (rule:admin_required or project_id:%(target.project.id)s) has not been registered147 # 代ç ä¸æ¾ç¤º authorize åæ° ruleï¼å¨è¿éä¼ å
¥çæ¯OrCheckç±»åï¼èå
¶è¦æ±æ¯ stringç±»åï¼ä»¥ä¾¿è½å¤æ if rule in self.registered_rules148 # çcodeï¼149 150 print(result)151'''152'''153æ¼ç¤ºä¸ç§ä¸å¯ä»¥çè°ç¨æ¹å¼ï¼authorize(<string>, ...)154ctx = context.RequestContext()155creds = ctx.to_policy_values()156with open("policy.json") as fr:157 data = fr.read()158 rules = policy.Rules.load(data)159 enforcer = policy.Enforcer(cfg.CONF)160 enforcer.set_rules(rules)161 enforcer.registered_rules = rules162 result = enforcer.authorize("identity:list_project_tags", {}, creds)163 # *** AttributeError: AttributeError("'OrCheck' object has no attribute 'scope_types'",)164 # ç°æBaseCheckä¸è¿æ²¡æscope_types 1.31çæ¬çRuleDefaultä¸å¼å
¥ã165 # æ以ææ¡£ä¸è®²ï¼å¯ä»¥ä½¿ç¨str ç±»åçrule ä½ä¸ºauthorize åæ°ï¼ä½å®é
ä¸è¿è¡è³ enforceå½æ°ä¸çåæ¯æ¶åºç°é®é¢ã166 167 print(result)168'''169'''170æ¼ç¤ºç´æ¥å¯¹enforcer.registered_rules 使ç¨Rules 对象èµå¼ã171注æregistered_rules æ¯dictç±»åï¼èä¹æ以ç´æ¥å¯ä»¥ç¨Rules对象èµå¼æ¯å 为Rulesçåºç±»å°±æ¯dictï¼å
¶å
é¨æ°æ®æ ¼å¼ä¹æ¯dictã172with open("policy.json") as fr:173 data = fr.read()174 rules = policy.Rules.load(data)175 # print(type(rules)) rules is a Rules176 enforcer = policy.Enforcer(cfg.CONF)177 enforcer.set_rules(rules)178 enforcer.registered_rules = rules179 # print(enforcer.registered_rules) registered_rules is a dict.180'''181'''182éªè¯ 使ç¨context.RequestContext çæcreds183ctx = context.RequestContext()184ctx.roles = ['admin']185ctx.project_id = 'myproject'186data = ctx.to_policy_values()187print(type(data))188import json189print(json.dumps(data._dict, indent=2))190result: 191{192 "service_roles": [],193 "user_id": null,194 "roles": [195 "admin"196 ],197 "system_scope": null,198 "service_project_id": null,199 "service_user_id": null,200 "service_user_domain_id": null,201 "service_project_domain_id": null,202 "is_admin_project": true,203 "user_domain_id": null,204 "project_id": "myproject",205 "project_domain_id": null206}207'''208'''209æ¼ç¤º çæenforcer 对象ï¼å¹¶å°Rules 对象èµå¼å°enforcer210with open("policy.json") as fr:211 data = fr.read()212 rules = policy.Rules.load(data)213 enforcer = policy.Enforcer(cfg.CONF)214 enforcer.set_rules(rules)215 print(enforcer.rules)216''' 217'''218æ¼ç¤ºä»æ件ä¸å è½½dictï¼éè¿Rules.load çæRules ç±»å对象219with open("policy.json") as fr:220 data = fr.read()221 rules = policy.Rules.load(data)222 print(rules)223 rule = rules.get("identity:get_project_tag")224 print(dir(rule))225'''226'''227æ¼ç¤º åcfg.CONF ä¸èµ·ä½¿ç¨.228CONF = cfg.CONF229CONF(['--config-file', 'policy.conf'])230rule_name = 'identity:get_domain'231# å建enforcerä¹åéè¦æå¨load_rules æè½ä½ç°å¨enforcer.rulesä¸ã232enforcer = policy.Enforcer(CONF)233enforcer.load_rules(True)234print("after load_rules: ", enforcer.rules.get(rule_name))235# éè¿register_default å¯ä»¥æ³¨åæ°çè§åï¼è¿ä¸ªè§åæ¯defaultï¼ç®åç解æ¯å½ææé½å¹é
ä¸ä¸çæ¶åä¼ä½¿ç¨defaultï¼ä½æ¯éè¦éæ°load_rulesã236enforcer.register_default(policy.RuleDefault(name='test', check_str="rule:x or role:test or role:admin"))237enforcer.load_rules(True)238print("registered rules: ", enforcer.registered_rules)239print("after test rule registered: ", enforcer.rules.get('test'))240enforcer.register_default(policy.RuleDefault(name='identity:get_domain', check_str="rule:admin_required or project_id:%(target.project.id)s"))241# è§åçéªè¯è¿ç¨ï¼ 242print("test pass", enforcer.authorize('identity:get_domain', {}, {'roles': ['test']}))...
test_project_tags.py
Source:test_project_tags.py
...41 updated_tags = self.project_tags_client.update_all_project_tags(42 project['id'], tags_to_update)['tags']43 self.assertEqual(sorted(tags_to_update), sorted(updated_tags))44 # Verify that listing project tags works.45 retrieved_tags = self.project_tags_client.list_project_tags(46 project['id'])['tags']47 self.assertEqual(sorted(tags_to_update), sorted(retrieved_tags))48 # Verify that deleting a project tag works.49 self.project_tags_client.delete_project_tag(50 project['id'], tags_to_update[0])51 self.assertRaises(lib_exc.NotFound,52 self.project_tags_client.check_project_tag_existence,53 project['id'], tags_to_update[0])54 # Verify that deleting all project tags works.55 self.project_tags_client.delete_all_project_tags(project['id'])56 retrieved_tags = self.project_tags_client.list_project_tags(57 project['id'])['tags']...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!