How to use list_project_tags method in tempest

Best Python code snippet using tempest_python

app.py

Source:app.py Github

copy

Full Screen

1# -*- coding: utf-82################## 总结 ##################3'''4关于设计架构:5* 文件列表:6__init__.py _external.py fixture.py opts.py sphinxext.py version.py7_cache_handler.py _i18n.py generator.py policy.py sphinxpolicygen.py8_checks.py _parser.py locale shell.py tests9* 设计架构:10.json --> _parser module --> _checks module --> policy module11json 文本文件 负责将所有的行转化成规则 定义三种kind,和各种Rule 实现规则的组织Rules,入口函数Enforcer12 (文本到规则的转化)13'''14'''15关于规则解析:16具体的规则解析过程参见 _parser.py17'''18'''19关于使用方法:20两种方法实现规则的加载。211. 手动加载,自己生成Rules对象,传递给Enforcer222. 使用Enforcer自动加载,和CONF配合使用。23'''24'''25关于调试过程:26python -m test.py 27曾经被定位过的点:28b /Users/zong/PythonEnvs/oslo-env/lib/python2.7/site-packages/oslo_policy/policy.py:79229 792 def enforce(self, rule, target, creds, do_raise=False, exc=None,30 793 *args, **kwargs):31b /Users/zong/PythonEnvs/oslo-env/lib/python2.7/site-packages/oslo_policy/_checks.py:33332 290 @register(None)33 291 class GenericCheck(Check):34 ...35 333 def __call__(self, target, creds, enforcer, current_rule=None):36 33437'''38'''39关于命令行:401. pip install keystone412. oslopolicy-sample-generator --namespace keystone --format json --output-file policy.json423. oslopolicy-checker --policy policy.json --access token_admin.json --rule identity:get_project_tag --is_admin=True43 $ passed: identity:get_project_tag44'''45'''46关于文档缺陷:47感觉文档写的不明不白。缺少原理白皮书!48我的期望的oslo_policy 模块 是 49给我提供一个入口,authorize! 50围绕这一个入口:加载规则,验证规则,51构建验证过程所需要的用户信息creds,52出口一个:True or False53没有地方提到creds的类型。自己去找openstack的实现文档。54from /usr/share/openstack-dashboard/openstack_dashboard/policy.py:55 30 """Mixin that adds the get_policy_target function56 3157 32 policy_target_attrs - a tuple of tuples which defines58 33 the relationship between attributes in the policy59 34 target dict and attributes in the passed datum object.60 35 policy_target_attrs can be overwritten by sub-classes61 36 which do not use the default, so they can neatly define62 37 their policy target information, without overriding the63 38 entire get_policy_target function.64 39 """65 4066 41 policy_target_attrs = (("project_id", "tenant_id"),67 42 ("user_id", "user_id"),68 43 ("domain_id", "domain_id"),69 44 ("target.project.domain_id", "domain_id"),70 45 ("target.user.domain_id", "domain_id"),71 46 ("target.group.domain_id", "domain_id"))72 4773 48 def get_policy_target(self, request, datum=None):74 49 policy_target = {}75 50 for policy_attr, datum_attr in self.policy_target_attrs:76 51 if datum:77 52 policy_target[policy_attr] = getattr(datum, datum_attr, None)78 53 else:79 54 policy_target[policy_attr] = None80 55 return policy_target811263 "is invalid. It should be one of %(allowed)s")821264 % {'feature': feature,831265 'allowed': ' '.join(feature_policies.keys())})841266 role = (('network', policy_name),)851267 if not policy.check(role, request):861268 return False87为什么 没有 register_rule(s), 而只有register_default(s),导致registered_rules无法通过接口赋值。88为什么没有一个文档告诉我说Enforcer.authorize 就是最终往外提供的接口。 89'''90################## 以下是示例代码 ##################91from oslo_policy import policy92from oslo_config import cfg93from oslo_context import context94'''95演示 规则匹配过程:96result 为True只有当: 971. ctx.roles = ['admin']982. ctx.project_id = 'myproject'993. {'target.project.id': 'myproject'}100即,规则和target会被用来匹配 creds.101ctx = context.RequestContext()102ctx.roles = ['admin']103ctx.project_id = 'myproject'104creds = ctx.to_policy_values()105data = '{"myrule": "project_id:%(target.project.id)s"}'106rules = policy.Rules.load(data)107enforcer = policy.Enforcer(cfg.CONF)108enforcer.set_rules(rules)109enforcer.registered_rules = rules110print(rules.get("myrule"))111print(creds)112result = enforcer.enforce(rules.get("myrule"), {'target.project.id': 'myproject'}, creds)113print(result)114'''115'''116验证最终正确的使用方式,enforcer.enforce(rule(not str), ...)117ctx = context.RequestContext()118ctx.roles = ['admin']119creds = ctx.to_policy_values()120with open("policy.json") as fr:121 data = fr.read()122 rules = policy.Rules.load(data)123 enforcer = policy.Enforcer(cfg.CONF)124 enforcer.set_rules(rules)125 enforcer.registered_rules = rules126 print(rules.get("identity:list_project_tags"))127 print(creds)128 129 result = enforcer.enforce(rules.get("identity:list_project_tags"), {'project': {'id': '23432435345345'}}, creds)130 print(result)131'''132'''133演示另外一种不可以的调用方式,authorize使用BaseCheck 类型调用。他会因为134 972 if rule not in self.registered_rules:135 973 raise PolicyNotRegistered(rule)136 直接报错。137ctx = context.RequestContext()138creds = ctx.to_policy_values()139with open("policy.json") as fr:140 data = fr.read()141 rules = policy.Rules.load(data)142 enforcer = policy.Enforcer(cfg.CONF)143 enforcer.set_rules(rules)144 enforcer.registered_rules = rules145 result = enforcer.authorize(rules.get("identity:list_project_tags"), {}, creds)146 # oslo_policy.policy.PolicyNotRegistered: Policy (rule:admin_required or project_id:%(target.project.id)s) has not been registered147 # 代码中显示 authorize 参数 rule,在这里传入的是OrCheck类型,而其要求是 string类型,以便能判断 if rule in self.registered_rules148 # 烂code!149 150 print(result)151'''152'''153演示一种不可以的调用方式:authorize(<string>, ...)154ctx = context.RequestContext()155creds = ctx.to_policy_values()156with open("policy.json") as fr:157 data = fr.read()158 rules = policy.Rules.load(data)159 enforcer = policy.Enforcer(cfg.CONF)160 enforcer.set_rules(rules)161 enforcer.registered_rules = rules162 result = enforcer.authorize("identity:list_project_tags", {}, creds)163 # *** AttributeError: AttributeError("'OrCheck' object has no attribute 'scope_types'",)164 # 现有BaseCheck中还没有scope_types 1.31版本的RuleDefault中引入。165 # 所以文档中讲,可以使用str 类型的rule 作为authorize 参数,但实际上运行至 enforce函数中的分支时出现问题。166 167 print(result)168'''169'''170演示直接对enforcer.registered_rules 使用Rules 对象赋值。171注意registered_rules 是dict类型,而之所以直接可以用Rules对象赋值是因为Rules的基类就是dict,其内部数据格式也是dict。172with open("policy.json") as fr:173 data = fr.read()174 rules = policy.Rules.load(data)175 # print(type(rules)) rules is a Rules176 enforcer = policy.Enforcer(cfg.CONF)177 enforcer.set_rules(rules)178 enforcer.registered_rules = rules179 # print(enforcer.registered_rules) registered_rules is a dict.180'''181'''182验证 使用context.RequestContext 生成creds183ctx = context.RequestContext()184ctx.roles = ['admin']185ctx.project_id = 'myproject'186data = ctx.to_policy_values()187print(type(data))188import json189print(json.dumps(data._dict, indent=2))190result: 191{192 "service_roles": [],193 "user_id": null,194 "roles": [195 "admin"196 ],197 "system_scope": null,198 "service_project_id": null,199 "service_user_id": null,200 "service_user_domain_id": null,201 "service_project_domain_id": null,202 "is_admin_project": true,203 "user_domain_id": null,204 "project_id": "myproject",205 "project_domain_id": null206}207'''208'''209演示 生成enforcer 对象,并将Rules 对象赋值到enforcer210with open("policy.json") as fr:211 data = fr.read()212 rules = policy.Rules.load(data)213 enforcer = policy.Enforcer(cfg.CONF)214 enforcer.set_rules(rules)215 print(enforcer.rules)216''' 217'''218演示从文件中加载dict,通过Rules.load 生成Rules 类型对象219with open("policy.json") as fr:220 data = fr.read()221 rules = policy.Rules.load(data)222 print(rules)223 rule = rules.get("identity:get_project_tag")224 print(dir(rule))225'''226'''227演示 和cfg.CONF 一起使用.228CONF = cfg.CONF229CONF(['--config-file', 'policy.conf'])230rule_name = 'identity:get_domain'231# 创建enforcer之后需要手动load_rules 才能体现在enforcer.rules中。232enforcer = policy.Enforcer(CONF)233enforcer.load_rules(True)234print("after load_rules: ", enforcer.rules.get(rule_name))235# 通过register_default 可以注册新的规则,这个规则是default,目前理解是当所有都匹配不上的时候会使用default,但是需要重新load_rules。236enforcer.register_default(policy.RuleDefault(name='test', check_str="rule:x or role:test or role:admin"))237enforcer.load_rules(True)238print("registered rules: ", enforcer.registered_rules)239print("after test rule registered: ", enforcer.rules.get('test'))240enforcer.register_default(policy.RuleDefault(name='identity:get_domain', check_str="rule:admin_required or project_id:%(target.project.id)s"))241# 规则的验证过程: 242print("test pass", enforcer.authorize('identity:get_domain', {}, {'roles': ['test']}))...

Full Screen

Full Screen

test_project_tags.py

Source:test_project_tags.py Github

copy

Full Screen

...41 updated_tags = self.project_tags_client.update_all_project_tags(42 project['id'], tags_to_update)['tags']43 self.assertEqual(sorted(tags_to_update), sorted(updated_tags))44 # Verify that listing project tags works.45 retrieved_tags = self.project_tags_client.list_project_tags(46 project['id'])['tags']47 self.assertEqual(sorted(tags_to_update), sorted(retrieved_tags))48 # Verify that deleting a project tag works.49 self.project_tags_client.delete_project_tag(50 project['id'], tags_to_update[0])51 self.assertRaises(lib_exc.NotFound,52 self.project_tags_client.check_project_tag_existence,53 project['id'], tags_to_update[0])54 # Verify that deleting all project tags works.55 self.project_tags_client.delete_all_project_tags(project['id'])56 retrieved_tags = self.project_tags_client.list_project_tags(57 project['id'])['tags']...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful