Best Python code snippet using localstack_python
test_rules_api.py
Source:test_rules_api.py  
...87            body.update({'active': active})88        if priority is not None:89            body.update({'priority': priority})90        return body91    def _create_rule(self, name, org_id=None, pool_id="", owner_id="",92                     conditions=None, active=None, priority=None,93                     set_allowed=True):94        if not org_id:95            org_id = self.org_id96        if not conditions:97            conditions = [98                {"type": "name_starts_with", "meta_info": "name_starts_with"}99            ]100        if pool_id == "":101            pool_id = self.org_pool_id102        if owner_id == "":103            owner_id = self.employee['id']104        if set_allowed:105            _, employee = self.client.employee_get(owner_id)106            self.set_allowed_pair(employee['auth_user_id'], pool_id)107        rule_body = self._prepare_rule_body(108            name=name,109            pool_id=pool_id,110            owner_id=owner_id,111            conditions=conditions,112            active=active,113            priority=priority,114        )115        code, rule = self.client.rules_create(org_id, rule_body)116        self.assertEqual(code, 201, rule)117        return rule118    def _test_params(self, call, params, *args, required_parameters=None,119                     entities_should_exist=None):120        data = params.copy()121        unexpected_param_name = "unexpected_param"122        data[unexpected_param_name] = self.gen_id()123        code, response = call(*args, data)124        self.assertEqual(code, 400)125        self.assertTrue('Unexpected parameters' in response['error']['reason'])126        self.assertTrue(unexpected_param_name in response['error']['reason'])127        if required_parameters:128            for param in required_parameters:129                data = params.copy()130                data.pop(param)131                code, response = call(*args, data)132                self.assertEqual(code, 400)133                self.assertEqual(response['error']['reason'],134                                 '%s is not provided' % param)135        if entities_should_exist:136            for param in entities_should_exist:137                data = params.copy()138                id = str(self.gen_id())139                data[param] = id140                code, response = call(*args, data)141                self.assertEqual(code, 400)142                self.assertTrue('%s %s doesn\'t exist' % (param, id)143                                in response['error']['reason'])144                self.assertTrue(response['error']['error_code'], 'OE0212')145class TestRuleApi(TestRulesApiBase):146    @patch(AUTHORIZE_ACTION_METHOD)147    def test_create_rule(self, p_authorize):148        """149        Basic create rule request flow.150        Steps:151           - 1. Check initial count of rules. Should be 0.152           - 2. Create FullPair rule.153           - 3. Verify response code is 201.154           - 4. Verify count rules is 1.155           - 12. Create rule in inactive state.156             - verify state157        """158        p_authorize.return_value = True159        rules = self._get_rules()160        self.assertEqual(len(rules), 0)161        full_rule = self._create_rule('Test rule')162        self.assertEqual(self.p_activities_publish.call_count, 1)163        self.assertEqual(full_rule['pool_name'], self.organization['name'])164        self.assertEqual(full_rule['pool_purpose'], 'business_unit')165        self.assertEqual(full_rule['owner_name'], self.employee['name'])166        self.assertEqual(full_rule['creator_name'], self.employee['name'])167        self.assertEqual(full_rule['priority'], 1)168        rules = self._get_rules()169        self.assertEqual(len(rules), 1)170        self.assertEqual(rules[0]['id'], full_rule['id'])171        inactive = self._create_rule("Inactive rule",172                                     active=False)173        self.assertEqual(self.p_activities_publish.call_count, 2)174        code, rule = self.client.rule_get(inactive['id'])175        self.assertEqual(code, 200)176        self.assertEqual(rule['active'], False)177        rules = self._get_rules()178        self.assertEqual(len(rules), 2)179        code, rules = self.client.rules_list(self.org_id,180                                             valid_rules_only=True)181        self.assertEqual(code, 200)182        all_rules = rules['rules']183        for rule in all_rules:184            self.assertTrue(rule['id'] != inactive['id'])185            self.assertIn('priority', rule)186    @patch(AUTHORIZE_ACTION_METHOD)187    def test_create_rule_with_priority(self, p_authorize):188        """189           - 1. Check initial count of rules. Should be 0.190           - 2. Create 2 rules191           - 3. Create rules with priority 2192           - 4. Get list of rules and check priorities193        """194        p_authorize.return_value = True195        rules = self._get_rules()196        self.assertEqual(len(rules), 0)197        rule1 = self._create_rule('rule1')198        rule3 = self._create_rule('rule3')199        self.assertEqual(self.p_activities_publish.call_count, 2)200        rules = self._get_rules()201        self.assertEqual(len(rules), 2)202        rule2 = self._create_rule('rule2', priority=2)203        self.assertEqual(self.p_activities_publish.call_count, 3)204        self.assertEqual(rule2['priority'], 2)205        rule_body = self._prepare_rule_body(206            name='rule4',207            pool_id=self.org_pool_id,208            owner_id=self.employee['id'],209            conditions=[{"type": "name_starts_with", "meta_info": "name_starts_with"}],210            active=None,211            priority=5,212        )213        code, resp = self.client.rules_create(self.org_id, rule_body)214        self.assertEqual(code, 400)215        self.verify_error_code(resp, 'OE0217')216        rule_body.update({'priority': 0})217        code, resp = self.client.rules_create(self.org_id, rule_body)218        self.assertEqual(code, 400)219        self.verify_error_code(resp, 'OE0224')220        rules = self._get_rules()221        self.assertEqual(len(rules), 3)222        for priority, rule in enumerate(rules, start=1):223            self.assertEqual(rule['priority'], priority)224    def test_params_create_rule(self):225        """226        Test required parameters for creation rule.227        Steps:228           - 1. Verify requests without param:229             - name230             - conditions231           - 2. Verify requests with param, but entity doesn't exist:232             - owner_id233             - pool_id234           - 3. Verify request with conditions as empty array235             - err_code should be 400236             - localized code should be E0216237           - 3. Verify request without conditions in body238             - err_code should be 400239             - localized code should be E0216240           - 4. Try to create rule without owner or pool.241             - localized code should be E0216242           - 5. Try to create rule with unsupported param in condition243             - localized code should be E0212244           - 6. Try to create rule with unsupported condition type245             - localized code should be E0430246           - 7. Try to create rule with absent meta in condition247             - localized code should be E0430248           - 8. Try to create rule with absent type in condition249             - localized code should be E0430250           - 9. Try to create rule with pool in another org251             - localized code should be E0002252           - 10. Try to create rule with owner in another org253             - localized code should be E0002254           - 11. Try to create rule with invalid condition format255             - localized code should be E0344256        """257        rule_body = self._prepare_rule_body(258            name="Test rule",259            pool_id=self.org_pool_id,260            owner_id=self.employee['id'],261            conditions=[262                {"type": "name_starts_with", "meta_info": "test_"}263            ]264        )265        self._test_params(266            self.client.rules_create, rule_body, self.org_id,267            required_parameters=['name', 'conditions'],268            entities_should_exist=['owner_id', 'pool_id']269        )270        rule_body_copy = rule_body.copy()271        rule_body_copy['name'] = None272        code, resp = self.client.rules_create(self.org_id, rule_body_copy)273        self.assertEqual(code, 400)274        self.verify_error_code(resp, 'OE0216')275        rule_body_copy = rule_body.copy()276        rule_body_copy['conditions'] = []277        code, resp = self.client.rules_create(self.org_id, rule_body_copy)278        self.assertEqual(code, 400)279        self.verify_error_code(resp, 'OE0216')280        rule_body_copy = rule_body.copy()281        rule_body_copy.pop('conditions')282        code, resp = self.client.rules_create(self.org_id, rule_body_copy)283        self.assertEqual(code, 400)284        self.verify_error_code(resp, 'OE0216')285        rule_body_copy = rule_body.copy()286        rule_body_copy['conditions'] = None287        code, resp = self.client.rules_create(self.org_id, rule_body_copy)288        self.assertEqual(code, 400)289        self.verify_error_code(resp, 'OE0216')290        rule_body_copy = rule_body.copy()291        rule_body_copy['pool_id'] = None292        rule_body_copy['owner_id'] = None293        code, resp = self.client.rules_create(self.org_id, rule_body_copy)294        self.assertEqual(code, 400)295        self.verify_error_code(resp, 'OE0216')296        rule_body_copy = rule_body.copy()297        rule_body_copy.pop('pool_id')298        rule_body_copy.pop('owner_id')299        code, resp = self.client.rules_create(self.org_id, rule_body_copy)300        self.assertEqual(code, 400)301        self.verify_error_code(resp, 'OE0216')302        rule_body_copy = copy.deepcopy(rule_body)303        rule_body_copy['conditions'][0]['unsupported_param'] = 'test_val'304        code, resp = self.client.rules_create(self.org_id, rule_body_copy)305        self.assertEqual(code, 400)306        self.verify_error_code(resp, 'OE0212')307        rule_body_copy = copy.deepcopy(rule_body)308        rule_body_copy['conditions'][0]['type'] = 'UNSUPPORTED_TYPE'309        code, resp = self.client.rules_create(self.org_id, rule_body_copy)310        self.assertEqual(code, 400)311        self.verify_error_code(resp, 'OE0430')312        required_condition_fields = ['type', 'meta_info']313        for param in required_condition_fields:314            rule_body_copy = copy.deepcopy(rule_body)315            rule_body_copy['conditions'][0].pop(param)316            code, resp = self.client.rules_create(self.org_id,317                                                  rule_body_copy)318            self.assertEqual(code, 400)319            self.verify_error_code(resp, 'OE0216')320        rule_body_copy = copy.deepcopy(rule_body)321        rule_body_copy['conditions'][0]['unsupported_key'] = None322        code, resp = self.client.rules_create(self.org_id,323                                              rule_body_copy)324        self.assertEqual(code, 400)325        self.verify_error_code(resp, 'OE0212')326        rule_body_copy = rule_body.copy()327        rule_body_copy['owner_id'] = self.org2_employee['id']328        code, resp = self.client.rules_create(self.org_id, rule_body_copy)329        self.assertEqual(code, 400)330        self.verify_error_code(resp, 'OE0005')331        rule_body_copy = rule_body.copy()332        rule_body_copy['pool_id'] = self.org2_pool_id333        code, resp = self.client.rules_create(self.org_id, rule_body_copy)334        self.assertEqual(code, 400)335        self.verify_error_code(resp, 'OE0005')336        rule_body_copy = rule_body.copy()337        rule_body_copy['conditions'] = ["123", "123"]338        code, resp = self.client.rules_create(self.org_id, rule_body_copy)339        self.assertEqual(code, 400)340        self.verify_error_code(resp, 'OE0344')341    @patch(AUTHORIZE_ACTION_METHOD)342    def test_list_rules(self, p_authorize):343        """344        Test list rules API with query params345        Steps:346           - 1. Create rules:347             - rule1: full pair rule for pool_org and user1 as owner348             - rule2: full pair rule for pool_org and user2 as owner349             - rule3: full pair rule for sub_pool1 and user2 as owner350             - rule4: full pair rule for sub_pool1 and user1 as owner351           - 2. Request list rules. Should return all 4 rules352           - 3. Request list rules with owner as user1. Should return:353             - rule1, rule4354           - 4. Request list rules with pool as org_pool. Should return:355             - rule1, rule2356           - 5. Request list rules with org_pool and user1 as owner.357             Should return:358               - rule1359           -9. Request list rules with invalid owner_id.360             Should return:361               - empty array.362        """363        def check_list_rules(expected_rules, **kwargs):364            expected_ids = set([r['id'] for r in expected_rules])365            rules = self._get_rules(**kwargs)366            self.assertEqual(len(rules), len(expected_rules))367            for rule in rules:368                self.assertTrue(rule['id'] in expected_ids)369        p_authorize.return_value = True370        rules = self._get_rules()371        self.assertEqual(len(rules), 0)372        rule1 = self._create_rule("rule1")373        rule2 = self._create_rule("rule2", owner_id=self.employee2['id'])374        rule3 = self._create_rule(375            "rule3", pool_id=self.sub_pools[0]['id'],376            owner_id=self.employee2['id'])377        rule4 = self._create_rule(378            "rule4", pool_id=self.sub_pools[0]['id'],379            owner_id=self.employee['id'])380        rules = self._get_rules()381        self.assertEqual(len(rules), 4)382        check_list_rules([rule1, rule4], owner_id=self.employee['id'])383        check_list_rules([rule1, rule2], pool_id=self.org_pool_id)384        check_list_rules([rule1], owner_id=self.employee['id'],385                         pool_id=self.org_pool_id)386        check_list_rules([], owner_id=self.gen_id())387    @patch(AUTHORIZE_ACTION_METHOD)388    def test_create_rule_with_existing_name(self, p_authorize):389        """390        Test for trying crete rule with name that is already exist391        Steps:392           - 1. Check initial count of rules. Should be 0.393           - 2. Create FullPair rule with name TestRule.394           - 3. Verify response code is 201.395           - 4. Verify count rules is 1.396           - 5. Create FullPair rule with the same name.397             - verify localized code is E0431398           - 6. Verify count rules is still 1.399           - 5. Create rules with the same name.400             - verify localized code is E0431401           - 6. Verify count rules is still 1.402        """403        p_authorize.return_value = True404        rules = self._get_rules()405        self.assertEqual(len(rules), 0)406        rule_body = self._prepare_rule_body(407            name="TestRule",408            pool_id=self.org_pool_id,409            owner_id=self.employee['id'],410            conditions=[411                {"type": "name_starts_with", "meta_info": "test_"}412            ]413        )414        code, full_rule = self.client.rules_create(self.org_id, rule_body)415        self.assertEqual(code, 201)416        rules = self._get_rules()417        self.assertEqual(len(rules), 1)418        code, resp = self.client.rules_create(self.org_id, rule_body)419        self.assertEqual(code, 409)420        self.verify_error_code(resp, 'OE0149')421        rules = self._get_rules()422        self.assertEqual(len(rules), 1)423        p_authorize.return_value = False424        rule_body_copy = rule_body.copy()425        rule_body_copy.pop('pool_id')426        code, resp = self.client.rules_create(self.org_id, rule_body_copy)427        self.assertEqual(code, 409)428        self.verify_error_code(resp, 'OE0149')429        rules = self._get_rules()430        self.assertEqual(len(rules), 1)431        rule_body_copy = rule_body.copy()432        rule_body_copy.pop('owner_id')433        code, resp = self.client.rules_create(self.org_id, rule_body_copy)434        self.assertEqual(code, 409)435        self.verify_error_code(resp, 'OE0149')436        rules = self._get_rules()437        self.assertEqual(len(rules), 1)438    @patch(AUTHORIZE_ACTION_METHOD)439    def test_create_full_rule_with_invalid_target_pair(self, p_authorize):440        """441        Test creation rule with invalid target pair.442        Steps:443           - 1. Check initial count of rules. Should be 0.444           - 2. Create FullPair rule with invalid target pair.445           - 3. Verify response code is 403.446           - 4. Verify localized code is E0379447           - 4. Verify count rules is 0.448        """449        p_authorize.return_value = False450        rules = self._get_rules()451        self.assertEqual(len(rules), 0)452        rule_body = self._prepare_rule_body(453            name="TestRule",454            pool_id=self.org_pool_id,455            owner_id=self.employee['id'],456            conditions=[457                {"type": "name_starts_with", "meta_info": "test_"}458            ]459        )460        code, resp = self.client.rules_create(self.org_id, rule_body)461        self.assertEqual(code, 403)462        self.verify_error_code(resp, "OE0379")463        rules = self._get_rules()464        self.assertEqual(len(rules), 0)465    @patch(AUTHORIZE_ACTION_METHOD)466    def test_create_rule_with_invalid_tag_is_meta(self, p_authorize):467        """468        Test creation rule with invalid tag is meta condition469        Steps:470           - 1. Try to create rule with tags_is conditions with all possible471             invalid meta_info parameter472           - 2. Verify all tries are failed473           - 3. Verify no new rule created474        """475        p_authorize.return_value = True476        rules = self._get_rules()477        self.assertEqual(len(rules), 0)478        valid_rule_body = self._prepare_rule_body(479            name="TestRule",480            pool_id=self.org_pool_id,481            owner_id=self.employee['id'],482            conditions=[483                {"type": "name_starts_with", "meta_info": "test_"}484            ]485        )486        def check_response(meta_info, error_code):487            conditions = [488                {"type": "tag_is", "meta_info": meta_info}489            ]490            valid_rule_body['conditions'] = conditions491            code, resp = self.client.rules_create(self.org_id,492                                                  valid_rule_body)493            self.assertEqual(code, 400)494            self.verify_error_code(resp, error_code)495        check_response(None, "OE0216")496        check_response(123, "OE0214")497        check_response({}, "OE0214")498        check_response("{}", "OE0216")499        check_response("123", "OE0344")500        check_response("\"123\"", "OE0344")501        check_response("{\"123\"}", "OE0219")502        check_response("{\"key\": \"key\"}", "OE0216")503        check_response("{\"key\": 123}", "OE0214")504        check_response("{\"value\": \"value\"}", "OE0216")505        check_response("{\"key\": \"key\", \"value\": 123}", "OE0214")506        check_response("{\"key\": \"k\", \"value\": \"v\", \"new\": \"value\"}",507                       "OE0212")508        check_response("{\"key\": \"key\", \"unexpected\": \"value\"}", "OE0216")509        rules = self._get_rules()510        self.assertEqual(len(rules), 0)511    @patch(AUTHORIZE_ACTION_METHOD)512    def test_update_rule(self, p_authorize):513        """514        Basic test for rule update515        Steps:516           - 1. Check initial count of rules. Should be 0.517           - 2. Create FullPair rule with 2 conditions.518           - 3. Update rule:519             - update name520             - update active521             - update pool_id522             - update owner_id523             - delete one condition524             - update one existing condition525             - add new condition526           - 4. Verify all changes with GET rule API527           - 5. Verify extra fields in PATCH response528        """529        final_conditions_after_update = set()530        p_authorize.return_value = True531        rules = self._get_rules()532        self.assertEqual(len(rules), 0)533        conditions = [534            {"type": "name_starts_with", "meta_info": "NAME_STARTS_WITH"},535            {"type": "name_contains", "meta_info": "NAME_CONTAINS"},536        ]537        full_rule = self._create_rule("TestRule", conditions=conditions)538        rules = self._get_rules()539        self.assertEqual(len(rules), 1)540        code, original_rule = self.client.rule_get(full_rule['id'])541        self.assertEqual(code, 200)542        modified_rule_body = copy.deepcopy(original_rule)543        modified_rule_body['name'] = 'new_name'544        modified_rule_body['active'] = False545        modified_rule_body['owner_id'] = self.employee2['id']546        modified_rule_body['pool_id'] = self.sub_pools[0]['id']547        modified_rule_body['conditions'][0]['type'] = 'name_ends_with'548        final_conditions_after_update.add('name_ends_with')549        modified_rule_body['conditions'][0]['meta_info'] = 'NAME_ENDS_WITH'550        modified_rule_body['conditions'] = modified_rule_body['conditions'][:-1]551        modified_rule_body['conditions'].append(552            {"type": "name_is", "meta_info": "NAME_IS"})553        final_conditions_after_update.add('name_is')554        code, patched_response = self.client.rule_update(original_rule['id'],555                                                         modified_rule_body)556        self.assertEqual(code, 200)557        self.assertEqual(self.p_activities_publish.call_count, 2)558        code, modified_rule = self.client.rule_get(full_rule['id'])559        self.assertEqual(code, 200)560        self.assertEqual(modified_rule['name'], 'new_name')561        self.assertEqual(modified_rule['active'], False)562        self.assertEqual(modified_rule['pool_id'],563                         self.sub_pools[0]['id'])564        self.assertEqual(modified_rule['owner_id'], self.employee2['id'])565        self.assertEqual(len(modified_rule['conditions']), 2)566        for condition in modified_rule['conditions']:567            self.assertTrue(condition['type'] in final_conditions_after_update)568            self.assertTrue(569                condition['meta_info'].lower() in final_conditions_after_update)570        self.assertEqual(patched_response['creator_name'],571                         self.employee['name'])572        self.assertEqual(patched_response['owner_name'], self.employee2['name'])573        self.assertEqual(patched_response['pool_purpose'], "budget")574        self.assertEqual(patched_response['pool_name'],575                         self.sub_pools[0]['name'])576        for field in ["creator_name", "pool_name",577                      "owner_name", "pool_purpose"]:578            self.assertTrue(field in patched_response.keys())579    @patch(AUTHORIZE_ACTION_METHOD)580    def test_update_rule_with_existing_name(self, p_authorize):581        """582        Basic test for rule update583        Steps:584           - 1. Check initial count of rules. Should be 0.585           - 2. Create two rules with names rule0 and rule1586           - 3. Create rule with name rule2587           - 4. Create rule with name rule3588           - 5. Try to update rule0 with names rule1, rule2, rule3.589             - all updates should be failed.590             - verify localized code is E0431591           - 6. Check that rule0 has the same name592           - 7. Create rule for org2 with name rule_new_org.593           - 8. Update rule0 with name rule_new_org. Should be succeeded.594        """595        p_authorize.return_value = True596        rules = self._get_rules()597        self.assertEqual(len(rules), 0)598        rule0 = self._create_rule("rule0")599        rule1 = self._create_rule("rule1")600        rule2 = self._create_rule("rule2")601        rule3 = self._create_rule("rule3")602        for name in [rule1['name'], rule2['name'], rule3['name']]:603            rule0['name'] = name604            code, resp = self.client.rule_update(rule0['id'], rule0)605            self.assertEqual(code, 409)606            self.verify_error_code(resp, 'OE0149')607        code, rule = self.client.rule_get(rule0['id'])608        self.assertEqual(code, 200)609        self.assertEqual(rule['name'], "rule0")610        new_rule_name = "rule_new_org"611        with self.switch_user(self.org2_user_id):612            self._create_rule(new_rule_name, org_id=self.org2_id,613                              owner_id=self.org2_employee['id'],614                              pool_id=self.org2_pool_id)615        rule0['name'] = new_rule_name616        code, resp = self.client.rule_update(rule0['id'],617                                             rule0)618        # 5 rules created and one updated619        self.assertEqual(self.p_activities_publish.call_count, 6)620        self.assertEqual(code, 200)621        code, rule = self.client.rule_get(rule0['id'])622        self.assertEqual(code, 200)623        self.assertEqual(rule['name'], new_rule_name)624    @patch(AUTHORIZE_ACTION_METHOD)625    def test_update_rule_with_invalid_tag_is_meta(self, p_authorize):626        """627        Test update rule with invalid tag is meta condition628        Steps:629           - 1. Create valid rule with tag_is condition.630           - 2. Try to update rule with tags_is conditions with all possible631             invalid meta_info parameter632           - 3. Verify all tries are failed633           - 4. Verify no new rule created634        """635        p_authorize.return_value = True636        rules = self._get_rules()637        self.assertEqual(len(rules), 0)638        conditions = [639            {640             "type": "tag_is",641             "meta_info": "{\"key\":\"key\",\"value\":\"value\"}"642             }643        ]644        rule1 = self._create_rule('rule1', conditions=conditions)645        rules = self._get_rules()646        self.assertEqual(len(rules), 1)647        valid_rule_body = self._prepare_rule_body(648            name="TestRule",649            pool_id=self.org_pool_id,650            owner_id=self.employee['id'],651            conditions=[652                {"type": "name_starts_with", "meta_info": "test_"}653            ]654        )655        def check_response(meta_info, error_code):656            conditions = [657                {"type": "tag_is", "meta_info": meta_info}658            ]659            valid_rule_body['conditions'] = conditions660            code, resp = self.client.rule_update(rule1['id'],661                                                 valid_rule_body)662            self.assertEqual(code, 400)663            self.verify_error_code(resp, error_code)664        check_response(None, "OE0216")665        check_response(123, "OE0214")666        check_response({}, "OE0214")667        check_response("{}", "OE0216")668        check_response("123", "OE0344")669        check_response("\"123\"", "OE0344")670        check_response("{\"123\"}", "OE0219")671        check_response("{\"key\": \"key\"}", "OE0216")672        check_response("{\"key\": 123}", "OE0214")673        check_response("{\"value\": \"value\"}", "OE0216")674        check_response("{\"key\": \"key\", \"value\": 123}", "OE0214")675        check_response("{\"key\": \"k\", \"value\": \"v\", \"new\": \"value\"}",676                       "OE0212")677        check_response("{\"key\": \"key\", \"unexpected\": \"value\"}", "OE0216")678        rules = self._get_rules()679        self.assertEqual(len(rules), 1)680    @patch(AUTHORIZE_ACTION_METHOD)681    def test_update_rule_name_is_null(self, p_authorize):682        """683        Testing update rule name with None value684        Steps:685           - 1. Check initial count of rules. Should be 0.686           - 2. Create two FullPair rules with name rule0.687           - 3. Try to update rule0 with name is None688             - update should be failed.689             - verify localized code is E0216690           - 4. Try to update rule0 with name is integer value691             - update should be failed.692             - verify localized code is E0214693        """694        p_authorize.return_value = True695        rules = self._get_rules()696        self.assertEqual(len(rules), 0)697        full_rule = self._create_rule("rule0")698        full_rule['name'] = None699        code, resp = self.client.rule_update(full_rule['id'],700                                             full_rule)701        self.assertEqual(code, 400)702        self.verify_error_code(resp, 'OE0216')703        full_rule['name'] = 0704        code, resp = self.client.rule_update(full_rule['id'],705                                             full_rule)706        self.assertEqual(code, 400)707        self.verify_error_code(resp, 'OE0214')708    @patch(AUTHORIZE_ACTION_METHOD)709    def test_get_update_rule_invalid_id(self, p_authorize):710        """711        Test get_update_rule for invalid rule id712        Steps:713           - 1. Get rule by invalid id. Verify714             - code is 404715             - localized code is E0002716           - 2. Update rule by invalid id. Verify717             - code is 404718             - localized code is E0002719           - 3. Create rule for org2720           - 4. Try to update rule in org1 for rule id from org 2721        """722        p_authorize.return_value = True723        code, resp = self.client.rule_get(self.gen_id())724        self.assertEqual(code, 404)725        self.verify_error_code(resp, "OE0002")726        code, resp = self.client.rule_update(self.gen_id(), {})727        self.assertEqual(code, 404)728        self.verify_error_code(resp, "OE0002")729        # TODO uncomment after OSB-412 is done730        # with self.switch_user(self.org2_user_id):731        #     org2_rule = self._create_rule(732        #         'new_rule_name',733        #         org_id=self.org2_id,734        #         owner_id=self.org2_employee['id'],735        #         budget_id=self.org2_budget_id)736        # org2_rule_id = org2_rule['id']737        # code, resp = self.client.rule_get(org2_rule_id)738        # self.assertEqual(code, 404)739        # self.verify_error_code(resp, "OE0002")740        # code, resp = self.client.rule_update(org2_rule_id, {})741        # self.assertEqual(code, 404)742        # self.verify_error_code(resp, "OE0002")743    @patch(AUTHORIZE_ACTION_METHOD)744    def test_update_with_change_rule_priority(self, p_authorize):745        """746        Test update rule with changing rule priority747        Steps:748           - 2. Create 3 rules749           - 3. Update rule3 and set priority 1750           - 5. Verify new rule order751        """752        p_authorize.return_value = True753        rules = self._get_rules()754        self.assertEqual(len(rules), 0)755        rule1 = self._create_rule('rule1')756        rule2 = self._create_rule('rule2')757        rule3 = self._create_rule('rule3')758        rules = self._get_rules()759        self.assertEqual(len(rules), 3)760        original_order = [r['id'] for r in rules]761        rule2['priority'] = 0762        code, resp = self.client.rule_update(rule2['id'], rule2)763        self.assertEqual(code, 400)764        self.verify_error_code(resp, 'OE0224')765        rule2['priority'] = 5766        code, resp = self.client.rule_update(rule2['id'], rule2)767        self.assertEqual(code, 400)768        self.verify_error_code(resp, 'OE0217')769        rule1['priority'] = 1770        code, _ = self.client.rule_update(rule1['id'], rule1)771        self.assertEqual(code, 200)772        rules = self._get_rules()773        self.assertEqual(len(rules), 3)774        expected_new_order = [original_order[2], original_order[0],775                              original_order[1]]776        new_order = [r['id'] for r in rules]777        self.assertEqual(expected_new_order, new_order)778    @patch(AUTHORIZE_ACTION_METHOD)779    def test_update_without_pool_and_owner(self, p_authorize):780        """781        Test update rule without owner and pool specified782        Steps:783           - 1. Create full pair rule784           - 2. Update rule remove owner and pool785             - verify it's failed786             - verify localized code787           - 3. Update rule (remove owner)788           - 4. Update rule (remove pool)789             - verify it's failed790             - verify localized code791        """792        p_authorize.return_value = True793        rule = self._create_rule('test rule')794        rule_tmp = copy.deepcopy(rule)795        rule_tmp['pool_id'] = None796        rule_tmp['owner_id'] = None797        code, resp = self.client.rule_update(rule['id'], rule_tmp)798        self.assertEqual(code, 400)799        self.verify_error_code(resp, "OE0216")800        rule_tmp = copy.deepcopy(rule)801        rule_tmp['owner_id'] = None802        code, resp = self.client.rule_update(rule['id'], rule_tmp)803        self.assertEqual(code, 400)804        self.verify_error_code(resp, "OE0216")805        rule_tmp['pool_id'] = None806        code, resp = self.client.rule_update(rule['id'], rule_tmp)807        self.assertEqual(code, 400)808        self.verify_error_code(resp, "OE0216")809    @patch(AUTHORIZE_ACTION_METHOD)810    def test_update_remove_all_conditions(self, p_authorize):811        """812        Test update rule without conditions813        Steps:814           - 1. Create full pair rule815           - 2. Update rule without conditions(empty array)816             - verify it's failed817             - verify localized code is E0216818        """819        p_authorize.return_value = True820        rule = self._create_rule('test rule')821        rule['conditions'] = []822        code, resp = self.client.rule_update(rule['id'], rule)823        self.assertEqual(code, 400)824        self.verify_error_code(resp, "OE0216")825    @patch(AUTHORIZE_ACTION_METHOD)826    def test_update_invalid_conditions(self, p_authorize):827        """828        Test update rule with invalid conditions829        Steps:830           - 1. Create full pair rule831           - 2. Update rule with832             - not supported type in condition(E0430)833             - not supported key in condition(E0212)834             - with condition type is None(E0430)835             - with condition meta is None(E0216)836             - verify all requests are failed837        """838        p_authorize.return_value = True839        rule = self._create_rule('test rule')840        tmp_rule = copy.deepcopy(rule)841        tmp_rule['conditions'][0]['type'] = "unsupported_type"842        code, resp = self.client.rule_update(rule['id'], tmp_rule)843        self.assertEqual(code, 400)844        self.verify_error_code(resp, "OE0430")845        tmp_rule = copy.deepcopy(rule)846        tmp_rule['conditions'][0]['unsupported_key'] = "dummy"847        code, resp = self.client.rule_update(rule['id'], tmp_rule)848        self.assertEqual(code, 400)849        self.verify_error_code(resp, "OE0212")850        tmp_rule = copy.deepcopy(rule)851        tmp_rule['conditions'][0]["type"] = None852        code, resp = self.client.rule_update(rule['id'], tmp_rule)853        self.assertEqual(code, 400)854        self.verify_error_code(resp, "OE0430")855        tmp_rule = copy.deepcopy(rule)856        tmp_rule['conditions'][0]["meta_info"] = None857        code, resp = self.client.rule_update(rule['id'], tmp_rule)858        self.assertEqual(code, 400)859        self.verify_error_code(resp, "OE0216")860    @patch(AUTHORIZE_ACTION_METHOD)861    def test_update_invalid_target_pair(self, p_authorize):862        """863        Test update rule without owner and pool specified864        Steps:865           - 1. Create full pair rule866           - 2. Update rule with invalid target pair867             - verify it's failed868             - verify localized code is E0379869        """870        p_authorize.return_value = True871        rule = self._create_rule('test rule')872        p_authorize.return_value = False873        code, resp = self.client.rule_update(rule['id'], rule)874        self.assertEqual(code, 403)875        self.verify_error_code(resp, "OE0379")876    @patch(AUTHORIZE_ACTION_METHOD)877    def test_update_active_is_none(self, p_authorize):878        """879        Test update rule with invalid active field880        Steps:881           - 1. Create full pair rule882           - 2. Update rule with active is None883             - verify it's failed884             - verify localized code is E0212885        """886        p_authorize.return_value = True887        rule = self._create_rule('test rule')888        rule['active'] = None889        code, resp = self.client.rule_update(rule['id'], rule)890        self.assertEqual(code, 400)891        self.verify_error_code(resp, "OE0216")892    @patch(AUTHORIZE_ACTION_METHOD)893    def test_update_invalid_pool_or_owner_id(self, p_authorize):894        """895        Test update rule with specifying invalid owner id.896        Steps:897           - 1. Create full pair rule898           - 2. Update rule with random UUID for pool_id and owner_id899             - verify it's failed900             - verify localized code is E0002901           -3. Update rule with pool_id and owner_id from another org.902        """903        p_authorize.return_value = True904        rule = self._create_rule('test rule')905        for param in ['pool_id', 'owner_id']:906            rule_tmp = copy.deepcopy(rule)907            rule_tmp[param] = self.gen_id()908            code, resp = self.client.rule_update(rule['id'], rule_tmp)909            self.assertEqual(code, 400)910            self.verify_error_code(resp, "OE0005")911        rule_tmp = copy.deepcopy(rule)912        rule_tmp['pool_id'] = self.org2_pool_id913        code, resp = self.client.rule_update(rule['id'], rule_tmp)914        self.assertEqual(code, 400)915        self.verify_error_code(resp, "OE0005")916        rule_tmp = copy.deepcopy(rule)917        rule_tmp['owner_id'] = self.org2_employee['id']918        code, resp = self.client.rule_update(rule['id'], rule_tmp)919        self.assertEqual(code, 400)920        self.verify_error_code(resp, "OE0005")921    @patch(AUTHORIZE_ACTION_METHOD)922    def test_test_patch_rule_fields(self, p_authorize):923        """924        Test update rule with field by filed925        Steps:926           - 1. Create full pair rule927           - 2. Update all fields in rule one by one using only this key in body928           - 3. Verify all fields are updated929        """930        p_authorize.return_value = True931        rule = self._create_rule('test rule')932        new_rule = {933            'name': 'new_name',934            'pool_id': self.sub_pools[0]['id'],935            'owner_id': self.employee2['id'],936            'conditions':937                [{"type": "name_ends_with", "meta_info": "name_ends_with"}]938        }939        for key in new_rule.keys():940            body = {key: new_rule[key]}941            code, resp = self.client.rule_update(rule['id'], body)942            self.assertEqual(code, 200)943            code, rule = self.client.rule_get(rule['id'])944            self.assertEqual(code, 200)945            if key == 'conditions':946                for cond in rule[key]:947                    cond.pop('id')948            self.assertEqual(rule[key], new_rule[key])949    @patch(AUTHORIZE_ACTION_METHOD)950    def test_delete_rule(self, p_authorize):951        """952        Test delete rule953        Steps:954           - 1. Create rule rule1955           - 2. Create rule rule2956           - 3. Create rule rule3957           - 4. Verify count of rules, should be 3958           - 5. Delete rules one by one and check count -1959           - 6. Try to delete rule that is doesn't exist960        """961        p_authorize.return_value = True962        rules = self._get_rules()963        self.assertEqual(len(rules), 0)964        rule1 = self._create_rule('rule1')965        rule2 = self._create_rule('rule2')966        rule3 = self._create_rule('rule3')967        rules = self._get_rules()968        self.assertEqual(len(rules), 3)969        code, _ = self.client.rule_delete(rule1['id'])970        self.assertEqual(code, 204)971        self.assertEqual(self.p_activities_publish.call_count, 4)972        rules = self._get_rules()973        self.assertEqual(len(rules), 2)974        code, _ = self.client.rule_delete(rule2['id'])975        self.assertEqual(code, 204)976        self.assertEqual(self.p_activities_publish.call_count, 5)977        rules = self._get_rules()978        self.assertEqual(len(rules), 1)979        code, _ = self.client.rule_delete(rule3['id'])980        self.assertEqual(code, 204)981        self.assertEqual(self.p_activities_publish.call_count, 6)982        rules = self._get_rules()983        self.assertEqual(len(rules), 0)984        code, resp = self.client.rule_delete(self.gen_id())985        self.assertEqual(code, 404)986        self.verify_error_code(resp, 'OE0002')987    @patch(AUTHORIZE_ACTION_METHOD)988    def test_delete_rule_priority_change(self, p_authorize):989        """990        Test delete rule991        Steps:992           - 1. Create 3 rules993           - 2. Verify count of rules, should be 3994           - 3. Delete middle value995           - 4. Check priority order updated and have no gaps996        """997        p_authorize.return_value = True998        rules = self._get_rules()999        self.assertEqual(len(rules), 0)1000        rule1 = self._create_rule('rule1')1001        rule2 = self._create_rule('rule2')1002        rule3 = self._create_rule('rule3')1003        rules = self._get_rules()1004        self.assertEqual(len(rules), 3)1005        code, _ = self.client.rule_delete(rule2['id'])1006        self.assertEqual(code, 204)1007        rules = self._get_rules()1008        self.assertEqual(len(rules), 2)1009        for i, rule in enumerate(rules, start=1):1010            self.assertEqual(rule['priority'], i)1011class TestApplyRuleApi(TestRulesApiBase):1012    def setUp(self, version='v2'):1013        super().setUp(version)1014        patch('rest_api_server.controllers.cloud_account.'1015              'CloudAccountController._configure_report').start()1016        patch('rest_api_server.handlers.v1.base.BaseAuthHandler.'1017              '_get_user_info', return_value=self.user).start()1018        aws_cloud_acc = {1019            'name': 'my cloud_acc',1020            'type': 'aws_cnr',1021            'config': {1022                'access_key_id': 'key',1023                'secret_access_key': 'secret',1024                'config_scheme': 'create_report'1025            }1026        }1027        _, self.cloud_acc = self.create_cloud_account(1028            self.org_id, aws_cloud_acc, auth_user_id=self.user_id)1029        _, cloud_rules = self.client.rules_list(self.org_id)1030        self.set_allowed_pair(self.user_id,1031                              cloud_rules['rules'][0]['pool_id'])1032        self.cloud_acc_id = self.cloud_acc['id']1033    def _create_resources(self, names, cloud_resource_ids=[]):1034        valid_body = {'resources': []}1035        for i, name in enumerate(names):1036            cloud_resource_id = 'res_id_%s' % self.gen_id()1037            if i < len(cloud_resource_ids):1038                cloud_resource_id = cloud_resource_ids[i]1039            valid_body['resources'].append(1040                {1041                    'cloud_resource_id': cloud_resource_id,1042                    'name': name,1043                    'resource_type': 'VM',1044                }1045            )1046        code, result = self.cloud_resource_create_bulk(1047            self.cloud_acc_id, valid_body, return_resources=True)1048        self.assertEqual(code, 200)1049        return result['resources']1050    def _verify_assignments(self, resources, expected_map):1051        self.assertEqual(len(resources), len(expected_map.keys()))1052        for resource in resources:1053            self.assertEqual(1054                resource.get('employee_id'),1055                expected_map[resource['name']]['employee_id'])1056            self.assertEqual(1057                resource.get('pool_id'),1058                expected_map[resource['name']]['pool_id'])1059    @patch(AUTHORIZE_ACTION_METHOD)1060    @patch(RULE_APPLY_CONDITION_CLOUD_IS)1061    def test_apply_rules_all_supported_conditions(self, p_rule_apply,1062                                                  p_authorize):1063        """1064        Test checks all supported conditions in one rule1065        Steps:1066           - 1. Create full pair rule rule1 with:1067             - name_is1068             - name_starts_with1069             - name_ends_with1070             - name_contains1071           - 2. Create resources:1072             - one matched all conditions1073             - one doesn't match name_is1074           - 3. Verify the first resource is assigned, the second one is not.1075        """1076        p_authorize.return_value = True1077        p_rule_apply.return_value = False1078        conditions = [1079            {"type": "name_starts_with", "meta_info": "starts_"},1080            {"type": "name_ends_with", "meta_info": "_ends"},1081            {"type": "name_contains", "meta_info": "contains"},1082            {"type": "name_is", "meta_info": "starts_contains_ends"},1083        ]1084        self._create_rule('test_rule', conditions=conditions,1085                          pool_id=self.sub_pools_ids[0],1086                          owner_id=self.employee2['id'])1087        resources = self._create_resources(1088            ['starts_contains_ends', 'starts_contains_new_ends']1089        )1090        expected_map = {1091            'starts_contains_ends': {1092                'pool_id': self.sub_pools_ids[0],1093                'employee_id': self.employee2['id']1094            },1095            'starts_contains_new_ends': {1096                'pool_id': self.org_pool_id,1097                'employee_id':  self.employee['id']1098            }1099        }1100        self._verify_assignments(resources, expected_map)1101    @patch(AUTHORIZE_ACTION_METHOD)1102    @patch(RULE_APPLY_CONDITION_CLOUD_IS)1103    def test_apply_rules_multiple_rules_multiple_resources(self, p_rule_apply,1104                                                           p_authorize):1105        """1106        Test checks applying rules. Multiple rules, multiple resources1107        Steps:1108           - 1. Create multiple rules rule1, rule2, rule31109           - 2. Create resources:1110             - two resources matched rule11111             - two resources matched rule21112             - two resources matched rule31113             - two doesn't match any rules1114           - 3. Verify the resource assignments.1115        """1116        p_authorize.return_value = True1117        p_rule_apply.return_value = False1118        conditions = [1119            {"type": "name_starts_with", "meta_info": "starts_"},1120        ]1121        rule1 = self._create_rule('rule1', conditions=conditions,1122                                  pool_id=self.sub_pools_ids[0],1123                                  owner_id=self.employee['id'])1124        conditions = [1125            {"type": "name_ends_with", "meta_info": "_ends"},1126        ]1127        rule2 = self._create_rule('rule2', conditions=conditions,1128                                  pool_id=self.sub_pools_ids[1],1129                                  owner_id=self.employee2['id'])1130        conditions = [1131            {"type": "name_contains", "meta_info": "contains"},1132        ]1133        rule3 = self._create_rule('rule3', conditions=conditions,1134                                  pool_id=self.sub_pools_ids[2],1135                                  owner_id=self.employee3['id'])1136        res_map = {1137            'starts_name_1': 'cloud_resource_id_1',1138            'name_2': 'starts_/res_2/id',1139            'name_3_ends': 'cloud_resource_id_3',1140            'name_4': 'res_4/_ends',1141            'name_contains_5': 'cloud_resource_id_5',1142            'name_6': 'res_id/contains_6',1143            'name_7': 'res_7_id',1144            'name_8': 'res_8_id'1145        }1146        resources = self._create_resources(1147            names=list(res_map.keys()),1148            cloud_resource_ids=list(res_map.values())1149        )1150        expected_map = {1151            'starts_name_1': {1152                'pool_id': self.sub_pools_ids[0],1153                'employee_id': self.employee['id']1154            },1155            'name_2': {1156                'pool_id': self.sub_pools_ids[0],1157                'employee_id': self.employee['id']1158            },1159            'name_3_ends': {1160                'pool_id': self.sub_pools_ids[1],1161                'employee_id': self.employee2['id']1162            },1163            'name_4': {1164                'pool_id': self.sub_pools_ids[1],1165                'employee_id': self.employee2['id']1166            },1167            'name_contains_5': {1168                'pool_id': self.sub_pools_ids[2],1169                'employee_id': self.employee3['id']1170            },1171            'name_6': {1172                'pool_id': self.sub_pools_ids[2],1173                'employee_id': self.employee3['id']1174            },1175            'name_7': {1176                'pool_id': self.org_pool_id,1177                'employee_id': self.employee['id']1178            },1179            'name_8': {1180                'pool_id': self.org_pool_id,1181                'employee_id': self.employee['id']1182            },1183        }1184        self._verify_assignments(resources, expected_map)1185        applied_rules_map = {1186            'starts_name_1': [1187                {'id': rule1['id'], 'name': rule1['name'],1188                 'pool_id': rule1['pool_id']}1189            ],1190            'name_2': [1191                {'id': rule1['id'], 'name': rule1['name'],1192                 'pool_id': rule1['pool_id']}1193            ],1194            'name_3_ends': [1195                {'id': rule2['id'], 'name': rule2['name'],1196                 'pool_id': rule2['pool_id']}1197            ],1198            'name_4': [1199                {'id': rule2['id'], 'name': rule2['name'],1200                 'pool_id': rule2['pool_id']}1201            ],1202            'name_contains_5': [1203                {'id': rule3['id'], 'name': rule3['name'],1204                 'pool_id': rule3['pool_id']}1205            ],1206            'name_6': [1207                {'id': rule3['id'], 'name': rule3['name'],1208                 'pool_id': rule3['pool_id']}1209            ],1210            'name_7': None,1211            'name_8': None1212        }1213        for r in resources:1214            name = r.get('name')1215            applied_rules = applied_rules_map[name]1216            self.assertEqual(applied_rules, r.get('applied_rules'))1217            if applied_rules:1218                self.assertEqual(1219                    r.get('pool_id'), applied_rules[0]['pool_id'])1220    @patch(AUTHORIZE_ACTION_METHOD)1221    def test_apply_rules_rules_priority(self, p_authorize):1222        """1223        Test checks applying rules by priority.1224        Steps:1225           - 1. Create rule1 and rule2 with conflicted conditions1226           - 2. Create 4 resources matched both rules1227           - 3. Verify the resource assignments.1228        """1229        p_authorize.return_value = True1230        conditions = [1231            {"type": "name_starts_with", "meta_info": "rule"},1232        ]1233        self._create_rule('rule1', conditions=conditions,1234                          pool_id=self.sub_pools_ids[0],1235                          owner_id=self.employee2['id'])1236        self._create_rule('rule2', conditions=conditions,1237                          pool_id=self.sub_pools_ids[1],1238                          owner_id=self.employee3['id'])1239        resources = self._create_resources(1240            [1241                'rule1',1242                'rule2',1243                'rule3',1244                'rule4',1245            ]1246        )1247        expected_map = {1248            'rule1': {1249                'pool_id': self.sub_pools_ids[1],1250                'employee_id': self.employee3['id']1251            },1252            'rule2': {1253                'pool_id': self.sub_pools_ids[1],1254                'employee_id': self.employee3['id']1255            },1256            'rule3': {1257                'pool_id': self.sub_pools_ids[1],1258                'employee_id': self.employee3['id']1259            },1260            'rule4': {1261                'pool_id': self.sub_pools_ids[1],1262                'employee_id': self.employee3['id']1263            }1264        }1265        self._verify_assignments(resources, expected_map)1266    @patch(AUTHORIZE_ACTION_METHOD)1267    @patch(RULE_APPLY_CONDITION_CLOUD_IS)1268    def test_apply_rules_no_rules(self, p_rule_apply, p_authorize):1269        """1270        Test checks creation resources in case of no rules are available in org1271        Steps:1272           - 1. Create 4 resources.1273           - 2. Verify root assignments.1274           - 3. Create 4 rules1275           - 4. Create resource that doesn't match any rules.1276           - 5. Verify root assignments.1277        """1278        p_authorize.return_value = True1279        p_rule_apply.return_value = False1280        resources = self._create_resources(1281            [1282                'rule1',1283                'rule2',1284                'rule3',1285                'rule4',1286            ]1287        )1288        expected_map = {1289            'rule1': {1290                'pool_id': self.org_pool_id,1291                'employee_id': self.employee['id']1292            },1293            'rule2': {1294                'pool_id': self.org_pool_id,1295                'employee_id': self.employee['id']1296            },1297            'rule3': {1298                'pool_id': self.org_pool_id,1299                'employee_id': self.employee['id']1300            },1301            'rule4': {1302                'pool_id': self.org_pool_id,1303                'employee_id': self.employee['id']1304            }1305        }1306        self._verify_assignments(resources, expected_map)1307        conditions = [1308            {"type": "name_starts_with", "meta_info": "rule"},1309        ]1310        self._create_rule('rule1', conditions=conditions,1311                          pool_id=self.sub_pools_ids[0],1312                          owner_id=self.employee2['id'])1313        self._create_rule('rule2', conditions=conditions,1314                          pool_id=self.sub_pools_ids[1],1315                          owner_id=self.employee3['id'])1316        resources = self._create_resources(1317            [1318                '_rule1',1319                '_rule2',1320                '_rule3',1321                '_rule4',1322            ]1323        )1324        expected_map = {1325            '_rule1': {1326                'pool_id': self.org_pool_id,1327                'employee_id': self.employee['id']1328            },1329            '_rule2': {1330                'pool_id': self.org_pool_id,1331                'employee_id': self.employee['id']1332            },1333            '_rule3': {1334                'pool_id': self.org_pool_id,1335                'employee_id': self.employee['id']1336            },1337            '_rule4': {1338                'pool_id': self.org_pool_id,1339                'employee_id': self.employee['id']1340            }1341        }1342        self._verify_assignments(resources, expected_map)1343    @patch(AUTHORIZE_ACTION_METHOD)1344    @patch(RULE_APPLY_CONDITION_CLOUD_IS)1345    def test_apply_rules_disabled_rules(self, p_rule_apply, p_authorize):1346        """1347        Test checks creation resources in case disabled rules1348        Steps:1349           - 1. Create 2 rules rule1 and rule2.1350           - 2. Create resource matched both rules1351           - 3. Verify rule1 is applied1352           - 4. Disable rule11353           - 5. Create resource matched both rules1354           - 3. Verify rule2 is applied1355           - 4. Disable rule21356           - 5. Create resource matched both rules1357           - 3. Verify no rules are applied1358        """1359        p_authorize.return_value = True1360        p_rule_apply.return_value = False1361        conditions = [1362            {"type": "name_starts_with", "meta_info": "rule"},1363        ]1364        rule1 = self._create_rule('rule1', conditions=conditions,1365                                  pool_id=self.sub_pools_ids[0],1366                                  owner_id=self.employee2['id'])1367        rule2 = self._create_rule('rule2', conditions=conditions,1368                                  pool_id=self.sub_pools_ids[1],1369                                  owner_id=self.employee3['id'])1370        resources = self._create_resources(1371            [1372                'rule2'1373            ]1374        )1375        expected_map = {1376            'rule2': {1377                'pool_id': self.sub_pools_ids[1],1378                'employee_id': self.employee3['id']1379            }1380        }1381        self._verify_assignments(resources, expected_map)1382        code, resp = self.client.rule_update(rule2['id'], {'active': False})1383        self.assertEqual(code, 200)1384        resources = self._create_resources(1385            [1386                'rule1'1387            ]1388        )1389        expected_map = {1390            'rule1': {1391                'pool_id': self.sub_pools_ids[0],1392                'employee_id': self.employee2['id']1393            }1394        }1395        self._verify_assignments(resources, expected_map)1396        code, resp = self.client.rule_update(rule1['id'], {'active': False})1397        self.assertEqual(code, 200)1398        resources = self._create_resources(1399            [1400                'rule3'1401            ]1402        )1403        expected_map = {1404            'rule3': {1405                'pool_id': self.org_pool_id,1406                'employee_id': self.employee['id']1407            }1408        }1409        self._verify_assignments(resources, expected_map)1410    @patch(AUTHORIZE_ACTION_METHOD)1411    def test_apply_rules_complex(self, p_authorize):1412        """1413        Test checks creation resources in case complex rules1414        Steps:1415           - 1. Create multiple rules with multiple condition types.1416           - 2. Create resource one resource match only one rule.1417           - 3. Verify assignments.1418        """1419        p_authorize.return_value = True1420        conditions = [1421            {"type": "name_starts_with", "meta_info": "qa"},1422            {"type": "name_ends_with", "meta_info": "prod"},1423            {"type": "name_contains", "meta_info": "resource"},1424        ]1425        self._create_rule('full_pair_rule1', conditions=conditions,1426                          pool_id=self.sub_pools_ids[0],1427                          owner_id=self.employee2['id'])1428        conditions = [1429            {"type": "name_is", "meta_info": "special_name"},1430        ]1431        self._create_rule('full_pair_rule2', conditions=conditions,1432                          pool_id=self.sub_pools_ids[1],)1433        resources = self._create_resources(1434            [1435             'qa_resource_prod',1436             'qa_resource1_prod',1437             'special_name',1438            ]1439        )1440        expected_map = {1441            'qa_resource_prod': {1442                'pool_id': self.sub_pools_ids[0],1443                'employee_id': self.employee2['id']1444            },1445            'qa_resource1_prod': {1446                'pool_id': self.sub_pools_ids[0],1447                'employee_id': self.employee2['id']1448            },1449            'special_name': {1450                'pool_id': self.sub_pools_ids[1],1451                'employee_id': self.employee['id']1452            },1453        }1454        self._verify_assignments(resources, expected_map)1455    @patch(AUTHORIZE_ACTION_METHOD)1456    def test_apply_rules_tag_condition(self, p_authorize):1457        """1458        Test checks creation resources in case of tag condition in rule1459        Steps:1460           - 1. Create rule with tag_is condition rule11461           - 2. Create resources with tags matched rule11462           - 3. Verify assignment.1463        """1464        p_authorize.return_value = True1465        conditions = [1466            {1467             "type": "tag_is",1468             "meta_info": "{\"key\":\"key\",\"value\":\"value\"}"1469             }1470        ]1471        self._create_rule('rule1', conditions=conditions)1472        valid_body = {'resources': []}1473        valid_body['resources'].append(1474            {1475                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1476                'name': 'resource',1477                'resource_type': 'VM',1478                'tags': {1479                    "key": "value",1480                    "key1": "value1",1481                    "key2": "value2",1482                }1483            }1484        )1485        valid_body['resources'].append(1486            {1487                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1488                'name': 'resource1',1489                'resource_type': 'VM',1490                'tags': {1491                    "key1": "value1",1492                    "key": "value",1493                    "key2": "value2",1494                }1495            }1496        )1497        valid_body['resources'].append(1498            {1499                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1500                'name': 'resource2',1501                'resource_type': 'VM',1502                'tags': {1503                    "key1": "value1",1504                    "key2": "value2",1505                    "key": "value"1506                }1507            }1508        )1509        code, result = self.cloud_resource_create_bulk(1510            self.cloud_acc_id, valid_body, return_resources=True)1511        self.assertEqual(code, 200)1512        expected_map = {1513            'resource': {1514                'pool_id': self.org_pool_id,1515                'employee_id': self.employee['id']1516            },1517            'resource1': {1518                'pool_id': self.org_pool_id,1519                'employee_id': self.employee['id']1520            },1521            'resource2': {1522                'pool_id': self.org_pool_id,1523                'employee_id': self.employee['id']1524            },1525        }1526        self._verify_assignments(result['resources'], expected_map)1527    @patch(AUTHORIZE_ACTION_METHOD)1528    def test_apply_rules_tag_exists_condition(self, p_authorize):1529        """1530        Test checks creation resources in case of tag exists condition in rule1531        Steps:1532           - 1. Create rule with tag_exists condition rule11533           - 2. Create resources with tags matched rule11534           - 3. Verify assignment.1535        """1536        p_authorize.return_value = True1537        conditions = [1538            {"type": "tag_exists", "meta_info": "key"}1539        ]1540        self._create_rule('rule1', conditions=conditions)1541        valid_body = {'resources': []}1542        valid_body['resources'].append(1543            {1544                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1545                'name': 'resource',1546                'resource_type': 'VM',1547                'tags': {1548                    "key": "value",1549                    "key1": "value1",1550                    "key2": "value2",1551                }1552            }1553        )1554        valid_body['resources'].append(1555            {1556                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1557                'name': 'resource1',1558                'resource_type': 'VM',1559                'tags': {1560                    "key1": "value1",1561                    "key": "value",1562                    "key2": "value2",1563                }1564            }1565        )1566        valid_body['resources'].append(1567            {1568                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1569                'name': 'resource2',1570                'resource_type': 'VM',1571                'tags': {1572                    "key1": "value1",1573                    "key2": "value2",1574                    "key": "value"1575                }1576            }1577        )1578        valid_body['resources'].append(1579            {1580                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1581                'name': 'resource3',1582                'resource_type': 'VM',1583                'tags': {1584                    "key1": "value1",1585                    "key2": "value2",1586                    "key": "value3"1587                }1588            }1589        )1590        code, result = self.cloud_resource_create_bulk(1591            self.cloud_acc_id, valid_body, return_resources=True)1592        self.assertEqual(code, 200)1593        expected_map = {1594            'resource': {1595                'pool_id': self.org_pool_id,1596                'employee_id': self.employee['id']1597            },1598            'resource1': {1599                'pool_id': self.org_pool_id,1600                'employee_id': self.employee['id']1601            },1602            'resource2': {1603                'pool_id': self.org_pool_id,1604                'employee_id': self.employee['id']1605            },1606            'resource3': {1607                'pool_id': self.org_pool_id,1608                'employee_id': self.employee['id']1609            },1610        }1611        self._verify_assignments(result['resources'], expected_map)1612    @patch(AUTHORIZE_ACTION_METHOD)1613    def test_apply_rules_environment_resources(self, p_authorize):1614        p_authorize.return_value = True1615        name = 'some_name'1616        conditions = [1617            {"type": "name_is", "meta_info": name}1618        ]1619        self._create_rule('rule1', conditions=conditions)1620        code, res = self.environment_resource_create(1621            self.org_id, {'name': name, 'resource_type': 'type'})1622        self.assertEqual(code, 201)1623        expected_map = {1624            name: {1625                'pool_id': self.org_pool_id,1626                'employee_id': self.employee['id']1627            }1628        }1629        self._verify_assignments([res], expected_map)1630    @patch(AUTHORIZE_ACTION_METHOD)1631    def test_apply_rules_cluster(self, p_authorize):1632        p_authorize.return_value = True1633        conditions = [1634            {"type": "tag_exists", "meta_info": "key"}1635        ]1636        self._create_rule('rule1', conditions=conditions)1637        valid_body = {'resources': []}1638        valid_body['resources'].extend([1639            {1640                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1641                'name': 'resource',1642                'resource_type': 'VM',1643                'tags': {1644                    "key": "value",1645                }1646            },1647            {1648                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1649                'name': 'resource1',1650                'resource_type': 'VM',1651                'tags': {1652                    "key": "value",1653                }1654            }])1655        code, ct = self.client.cluster_type_create(1656            self.org_id, {'name': 'some', 'tag_key': 'key'})1657        self.assertEqual(code, 201)1658        code, result = self.cloud_resource_create_bulk(1659            self.cloud_acc_id, valid_body, return_resources=True,1660            behavior='update_existing')1661        self.assertEqual(code, 200)1662        expected_map = {1663            'resource': {1664                'pool_id': self.org_pool_id,1665                'employee_id': self.employee['id']1666            },1667            'resource1': {1668                'pool_id': self.org_pool_id,1669                'employee_id': self.employee['id']1670            }1671        }1672        cluster = next(self.resources_collection.find(1673            {'cluster_type_id': ct['id']}))1674        self._verify_assignments(result['resources'], expected_map)1675        self.assertEqual(cluster.get('pool_id'), self.org_pool_id)1676    @patch(AUTHORIZE_ACTION_METHOD)1677    def test_apply_rules_tag_value_starts_condition(self, p_authorize):1678        """1679        Test checks creation resources in case of tag value starts with1680        condition in rule1681        Steps:1682           - 1. Create rule with tag_value_starts_with condition rule11683           - 2. Create resources with tags matched rule11684           - 3. Verify assignment.1685        """1686        p_authorize.return_value = True1687        conditions = [1688            {1689                "type": "tag_value_starts_with",1690                "meta_info": "{\"key\":\"key\",\"value\":\"value\"}"1691            }1692        ]1693        self._create_rule('rule1', conditions=conditions)1694        valid_body = {'resources': []}1695        valid_body['resources'].append(1696            {1697                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1698                'name': 'resource',1699                'resource_type': 'VM',1700                'tags': {1701                    "key": "value",1702                    "key1": "value1",1703                    "key2": "value2",1704                }1705            }1706        )1707        valid_body['resources'].append(1708            {1709                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1710                'name': 'resource1',1711                'resource_type': 'VM',1712                'tags': {1713                    "key1": "value1",1714                    "key": "value value",1715                    "key2": "value2",1716                }1717            }1718        )1719        valid_body['resources'].append(1720            {1721                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1722                'name': 'resource2',1723                'resource_type': 'VM',1724                'tags': {1725                    "key1": "value1",1726                    "key2": "value2",1727                    "key": "value vbivalue"1728                }1729            }1730        )1731        valid_body['resources'].append(1732            {1733                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1734                'name': 'resource3',1735                'resource_type': 'VM',1736                'tags': {1737                    "key": "valueue",1738                    "key1": "value1",1739                    "key2": "value2",1740                }1741            }1742        )1743        code, result = self.cloud_resource_create_bulk(1744            self.cloud_acc_id, valid_body, return_resources=True)1745        self.assertEqual(code, 200)1746        expected_map = {1747            'resource': {1748                'pool_id': self.org_pool_id,1749                'employee_id': self.employee['id']1750            },1751            'resource1': {1752                'pool_id': self.org_pool_id,1753                'employee_id': self.employee['id']1754            },1755            'resource2': {1756                'pool_id': self.org_pool_id,1757                'employee_id': self.employee['id']1758            },1759            'resource3': {1760                'pool_id': self.org_pool_id,1761                'employee_id': self.employee['id']1762            },1763        }1764        self._verify_assignments(result['resources'], expected_map)1765    @patch(AUTHORIZE_ACTION_METHOD)1766    @patch(RULE_APPLY_CONDITION_CLOUD_IS)1767    def test_apply_rules_resource_type(self, p_rule_apply, p_authorize):1768        """1769        Test checks creation resources in case of res type condition in rule1770        Steps:1771           - 1. Create rule with resource_type_is condition rule11772           - 2. Create resource with type matched rule1 and two more resource1773             which are not1774           - 3. Verify assignment.1775        """1776        p_authorize.return_value = True1777        p_rule_apply.return_value = False1778        conditions = [1779            {"type": "resource_type_is", "meta_info": "VM"}1780        ]1781        self._create_rule('rule1', conditions=conditions)1782        valid_body = {'resources': []}1783        valid_body['resources'].append(1784            {1785                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1786                'name': 'resource',1787                'resource_type': 'VM'1788            }1789        )1790        valid_body['resources'].append(1791            {1792                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1793                'name': 'resource1',1794                'resource_type': 'VM1'1795            }1796        )1797        valid_body['resources'].append(1798            {1799                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1800                'name': 'resource2',1801                'resource_type': 'vm'1802            }1803        )1804        code, result = self.cloud_resource_create_bulk(1805            self.cloud_acc_id, valid_body, return_resources=True)1806        self.assertEqual(code, 200)1807        expected_map = {1808            'resource': {1809                'pool_id': self.org_pool_id,1810                'employee_id': self.employee['id']1811            },1812            'resource1': {1813                'pool_id': self.org_pool_id,1814                'employee_id': self.employee['id']1815            },1816            'resource2': {1817                'pool_id': self.org_pool_id,1818                'employee_id': self.employee['id']1819            },1820        }1821        self._verify_assignments(result['resources'], expected_map)1822    @patch(AUTHORIZE_ACTION_METHOD)1823    @patch(RULE_APPLY_CONDITION_CLOUD_IS)1824    def test_apply_rules_region(self, p_rule_apply, p_authorize):1825        """1826        Test checks creation resources in case of res region condition in rule1827        Steps:1828           - 1. Create rule with region_is condition rule11829           - 2. Create resource with type matched rule1 and two more resource1830             which are not1831           - 3. Verify assignment.1832        """1833        p_authorize.return_value = True1834        p_rule_apply.return_value = False1835        conditions = [1836            {"type": "region_is", "meta_info": "test_region"}1837        ]1838        self._create_rule('rule1', conditions=conditions,1839                          pool_id=self.sub_pools_ids[0],1840                          owner_id=self.employee2['id'])1841        valid_body = {'resources': []}1842        valid_body['resources'].append(1843            {1844                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1845                'name': 'resource',1846                'region': 'test_region1',1847                'resource_type': 'VM'1848            }1849        )1850        # matched resource1851        valid_body['resources'].append(1852            {1853                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1854                'name': 'resource1',1855                'region': 'test_region',1856                'resource_type': 'VM1'1857            }1858        )1859        valid_body['resources'].append(1860            {1861                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1862                'name': 'resource2',1863                'region': 'test_region3',1864                'resource_type': 'vm'1865            }1866        )1867        code, result = self.cloud_resource_create_bulk(1868            self.cloud_acc_id, valid_body, return_resources=True)1869        self.assertEqual(code, 200)1870        expected_map = {1871            'resource': {1872                'pool_id': self.org_pool_id,1873                'employee_id': self.employee['id']1874            },1875            'resource1': {1876                'pool_id': self.sub_pools_ids[0],1877                'employee_id': self.employee2['id']1878            },1879            'resource2': {1880                'pool_id': self.org_pool_id,1881                'employee_id': self.employee['id']1882            },1883        }1884        self._verify_assignments(result['resources'], expected_map)1885    @patch(AUTHORIZE_ACTION_METHOD)1886    def test_apply_rules_cloud_is(self, p_authorize):1887        """1888        Test checks creation resources in case of cloud condition in rule1889        Steps:1890           - 1. Create rule with cloud_is condition rule11891           - 2. Create resources for right cloud1892           - 3. Verify assignment.1893           - 4. Create resources for wrong cloud1894           - 5. Verify assignment.1895           - 6. Verify that get rules returns related cloud acc entities1896        """1897        p_authorize.return_value = True1898        auth_user_id_1 = self.gen_id()1899        _, employee1 = self.client.employee_create(1900            self.org_id, {'name': 'name1', 'auth_user_id': auth_user_id_1})1901        conditions = [1902            {"type": "cloud_is", "meta_info": self.cloud_acc_id}1903        ]1904        self._create_rule('rule1', conditions=conditions,1905                          pool_id=self.sub_pools_ids[0],1906                          owner_id=employee1['id'])1907        valid_body = {'resources': []}1908        valid_body['resources'].append(1909            {1910                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1911                'name': 'resource',1912                'resource_type': 'VM'1913            }1914        )1915        valid_body['resources'].append(1916            {1917                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1918                'name': 'resource1',1919                'resource_type': 'VM1'1920            }1921        )1922        valid_body['resources'].append(1923            {1924                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1925                'name': 'resource2',1926                'resource_type': 'vm'1927            }1928        )1929        code, result = self.cloud_resource_create_bulk(1930            self.cloud_acc_id, valid_body, return_resources=True)1931        self.assertEqual(code, 200)1932        expected_map = {1933            'resource': {1934                'pool_id': self.sub_pools_ids[0],1935                'employee_id': employee1['id']1936            },1937            'resource1': {1938                'pool_id': self.sub_pools_ids[0],1939                'employee_id': employee1['id']1940            },1941            'resource2': {1942                'pool_id': self.sub_pools_ids[0],1943                'employee_id': employee1['id']1944            },1945        }1946        self._verify_assignments(result['resources'], expected_map)1947        aws_cloud_acc1 = {1948            'name': 'my cloud_acc1',1949            'type': 'aws_cnr',1950            'config': {1951                'access_key_id': 'key',1952                'secret_access_key': 'secret',1953                'config_scheme': 'create_report'1954            }1955        }1956        _, cloud_acc1 = self.create_cloud_account(1957            self.org_id, aws_cloud_acc1, auth_user_id=auth_user_id_1)1958        cloud_acc_id = cloud_acc1['id']1959        valid_body = {'resources': []}1960        valid_body['resources'].append(1961            {1962                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1963                'name': 'resource',1964                'resource_type': 'VM'1965            }1966        )1967        valid_body['resources'].append(1968            {1969                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1970                'name': 'resource1',1971                'resource_type': 'VM1'1972            }1973        )1974        valid_body['resources'].append(1975            {1976                'cloud_resource_id': 'res_id_%s' % self.gen_id(),1977                'name': 'resource2',1978                'resource_type': 'vm'1979            }1980        )1981        code, result = self.cloud_resource_create_bulk(1982            cloud_acc_id, valid_body, return_resources=True)1983        self.assertEqual(code, 200)1984        expected_map = {1985            'resource': {1986                'pool_id': self.org_pool_id,1987                'employee_id': self.employee['id']1988            },1989            'resource1': {1990                'pool_id': self.org_pool_id,1991                'employee_id': self.employee['id']1992            },1993            'resource2': {1994                'pool_id': self.org_pool_id,1995                'employee_id': self.employee['id']1996            },1997        }1998        self._verify_assignments(result['resources'], expected_map)1999        code, rules = self.client.rules_list(2000            organization_id=self.org_id)2001        self.assertEqual(code, 200)2002        self.assertIn(self.cloud_acc_id, rules['entities'])2003    @patch(AUTHORIZE_ACTION_METHOD)2004    def test_apply_rules_with_deleted_condition(self, p_authorize):2005        """2006        Test checks that deleted conditions don't apply on filtering.2007        Steps:2008           - 1. Create rule with one condition.2009           - 2. Modify rule. Add one new condition, remove existing one.2010           - 3. Create resource matches to the new condition,2011             but doesn't match to the deleted one.2012           - 3. Verify assignments.2013        """2014        p_authorize.return_value = True2015        conditions = [2016            {"type": "name_starts_with", "meta_info": "qa"}2017        ]2018        rule = self._create_rule('full_pair_rule1', conditions=conditions)2019        conditions = [2020            {"type": "name_starts_with", "meta_info": "prod"}2021        ]2022        code, patched_response = self.client.rule_update(2023            rule['id'],2024            {'conditions': conditions}2025        )2026        resources = self._create_resources(2027            [2028             'prod_resource_prod',2029            ]2030        )2031        expected_map = {2032            'prod_resource_prod': {2033                'pool_id': self.org_pool_id,2034                'employee_id': self.employee['id']2035            }2036        }2037        self._verify_assignments(resources, expected_map)2038    @patch(AUTHORIZE_ACTION_METHOD)2039    def test_rule_prioritize(self, p_authorize):2040        p_authorize.return_value = True2041        for rule_name in ['rule1', 'rule2', 'rule3', 'rule4']:2042            conditions = [{"type": "name_starts_with", "meta_info": rule_name}]2043            self._create_rule(rule_name, conditions=conditions)2044        code, rules = self.client.rules_list(self.org_id)2045        self.assertEqual(code, 200)2046        initial_rule_order = [r['id'] for r in rules['rules']]2047        # prioritize third rule2048        expected_new_order = [initial_rule_order[2], initial_rule_order[0],2049                              initial_rule_order[1], initial_rule_order[3],2050                              initial_rule_order[4]]2051        code, response = self.client.rule_prioritize(initial_rule_order[2])2052        self.assertEqual(code, 200)2053        new_order = [r['id'] for r in response['rules']]2054        self.assertEqual(expected_new_order, new_order)2055        code, list_response = self.client.rules_list(self.org_id)2056        self.assertEqual(code, 200)2057        self.assertEqual(list_response, response)2058    @patch(AUTHORIZE_ACTION_METHOD)2059    def test_rule_prioritize_first(self, p_authorize):2060        p_authorize.return_value = True2061        for rule_name in ['rule1', 'rule2', 'rule3', 'rule4']:2062            conditions = [{"type": "name_starts_with", "meta_info": rule_name}]2063            self._create_rule(rule_name, conditions=conditions)2064        code, rules = self.client.rules_list(self.org_id)2065        self.assertEqual(code, 200)2066        initial_rule_order = [r['id'] for r in rules['rules']]2067        code, response = self.client.rule_prioritize(initial_rule_order[0])2068        self.assertEqual(code, 200)2069        new_order = [r['id'] for r in response['rules']]2070        self.assertEqual(initial_rule_order, new_order)2071    @patch(AUTHORIZE_ACTION_METHOD)2072    def test_rule_promote(self, p_authorize):2073        p_authorize.return_value = True2074        for rule_name in ['rule1', 'rule2', 'rule3', 'rule4']:2075            conditions = [{"type": "name_starts_with", "meta_info": rule_name}]2076            self._create_rule(rule_name, conditions=conditions)2077        code, rules = self.client.rules_list(self.org_id)2078        self.assertEqual(code, 200)2079        initial_rule_order = [r['id'] for r in rules['rules']]2080        # promote fourth rule2081        expected_new_order = [initial_rule_order[0], initial_rule_order[1],2082                              initial_rule_order[3], initial_rule_order[2], initial_rule_order[4]]2083        code, resp = self.client.rule_promote(initial_rule_order[0])2084        self.assertEqual(code, 200)2085        code, response = self.client.rule_promote(initial_rule_order[3])2086        self.assertEqual(code, 200)2087        new_order = [r['id'] for r in response['rules']]2088        self.assertEqual(expected_new_order, new_order)2089    @patch(AUTHORIZE_ACTION_METHOD)2090    def test_rule_demote(self, p_authorize):2091        p_authorize.return_value = True2092        for rule_name in ['rule1', 'rule2', 'rule3', 'rule4']:2093            conditions = [{"type": "name_starts_with", "meta_info": rule_name}]2094            self._create_rule(rule_name, conditions=conditions)2095        code, rules = self.client.rules_list(self.org_id)2096        self.assertEqual(code, 200)2097        initial_rule_order = [r['id'] for r in rules['rules']]2098        # demote first rule2099        expected_new_order = [initial_rule_order[1], initial_rule_order[0],2100                              initial_rule_order[2], initial_rule_order[4],2101                              initial_rule_order[3]]2102        code, resp = self.client.rule_demote(initial_rule_order[3])2103        self.assertEqual(code, 200)2104        code, response = self.client.rule_demote(initial_rule_order[0])2105        self.assertEqual(code, 200)2106        new_order = [r['id'] for r in response['rules']]2107        self.assertEqual(expected_new_order, new_order)2108    @patch(AUTHORIZE_ACTION_METHOD)2109    def test_rule_deprioritize(self, p_authorize):2110        p_authorize.return_value = True2111        for rule_name in ['rule1', 'rule2', 'rule3', 'rule4']:2112            conditions = [{"type": "name_starts_with", "meta_info": rule_name}]2113            self._create_rule(rule_name, conditions=conditions)2114        code, rules = self.client.rules_list(self.org_id)2115        self.assertEqual(code, 200)2116        initial_rule_order = [r['id'] for r in rules['rules']]2117        # deprioritize second rule2118        expected_new_order = [initial_rule_order[0], initial_rule_order[2],2119                              initial_rule_order[3], initial_rule_order[4],2120                              initial_rule_order[1]]2121        code, response = self.client.rule_deprioritize(initial_rule_order[1])2122        self.assertEqual(code, 200)2123        new_order = [r['id'] for r in response['rules']]2124        self.assertEqual(expected_new_order, new_order)2125    @patch(AUTHORIZE_ACTION_METHOD)2126    @patch(RULE_APPLY_CONDITION_CLOUD_IS)2127    def test_invalid_rule_deactivated(self, p_rule_apply, p_authorize):2128        p_authorize.return_value = True2129        p_rule_apply.return_value = False2130        m_activities_publish = patch(2131            'rest_api_server.controllers.base.'2132            'BaseController.publish_activities_task').start()2133        conditions = [2134            {"type": "name_starts_with", "meta_info": "prod"}2135        ]2136        rule = self._create_rule('rule1', conditions=conditions,2137                                 pool_id=self.sub_pools_ids[2],2138                                 owner_id=self.employee3['id'],2139                                 set_allowed=False)2140        resources = self._create_resources(['prod_resource_prod'])2141        code, rule = self.client.rule_get(rule['id'])2142        self.assertEqual(code, 200)2143        self.assertFalse(rule['active'])2144        m_activities_publish.assert_has_calls([2145            call(rule['organization_id'], rule['id'], 'rule',2146                 'rule_deactivated', ANY, 'rule.rule_deactivated',2147                 add_token=True)])2148class TestRulesApplyApi(TestRulesApiBase):2149    def setUp(self, version='v2'):2150        super().setUp()2151        self._mock_auth_user(self.user_id)2152        patch('rest_api_server.handlers.v1.base.BaseAuthHandler._get_user_info',2153              return_value=self.user).start()2154        patch(AUTHORIZE_ACTION_METHOD, return_value=True).start()2155        patch('rest_api_server.controllers.cloud_account.'2156              'CloudAccountController._configure_report').start()2157        aws_cloud_acc = {2158            'name': 'my cloud_acc',2159            'type': 'aws_cnr',2160            'config': {2161                'access_key_id': 'key',2162                'secret_access_key': 'secret',2163                'config_scheme': 'create_report'2164            }2165        }2166        _, self.cloud_acc = self.create_cloud_account(2167            self.org_id, aws_cloud_acc, auth_user_id=self.user_id)2168        _, cloud_acc_rule = self.client.rules_list(self.org_id)2169        self.cloud_acc_rule = cloud_acc_rule['rules'][0]2170        self.cloud_acc_id = self.cloud_acc['id']2171    def _create_resources(self, names, pool_id=None, employee_id=None,2172                          tags=None):2173        valid_body = {'resources': []}2174        for name in names:2175            resource_dict = {2176                'cloud_resource_id': 'res_id_%s' % self.gen_id(),2177                'name': name,2178                'resource_type': 'VM',2179            }2180            if not pool_id:2181                pool_id = self.org_pool_id2182            if not employee_id:2183                employee_id = self.employee['id']2184            resource_dict['pool_id'] = pool_id2185            resource_dict['employee_id'] = employee_id2186            if tags:2187                resource_dict['tags'] = tags2188            valid_body['resources'].append(resource_dict)2189        code, result = self.cloud_resource_create_bulk(2190            self.cloud_acc_id, valid_body, return_resources=True)2191        self.assertEqual(code, 200)2192        return result['resources']2193    def test_rules_apply_api_invalid_params(self):2194        code, resp = self.client.rules_apply(self.org_id, pool_id=1)2195        self.assertEqual(code, 400)2196        self.verify_error_code(resp, 'OE0214')2197        code, resp = self.client.rules_apply(self.org_id, pool_id='  ')2198        self.assertEqual(code, 400)2199        self.verify_error_code(resp, 'OE0416')2200        code, resp = self.client.rules_apply(self.org_id, pool_id='')2201        self.assertEqual(code, 400)2202        self.verify_error_code(resp, 'OE0215')2203        code, resp = self.client.rules_apply(self.org_id, pool_id='a' * 500)2204        self.assertEqual(code, 400)2205        self.verify_error_code(resp, 'OE0215')2206        code, resp = self.client.rules_apply(2207            self.org_id, pool_id=self.org_pool_id, include_children='a')2208        self.assertEqual(code, 400)2209        self.verify_error_code(resp, 'OE0226')2210        body_unexpected = {'pool_id': self.org_pool_id, 'aa': 'bb'}2211        code, resp = self.client.post(self.client.rules_apply_url(self.org_id),2212                                      body_unexpected)2213        self.assertEqual(code, 400)2214        self.verify_error_code(resp, 'OE0212')2215    def verify_assignment(self, resource_id, pool_id, employee_id):2216        code, resource = self.client.cloud_resource_get(resource_id)2217        self.assertEqual(code, 200)2218        self.assertEqual(resource['pool_id'], pool_id)2219        self.assertEqual(resource['employee_id'], employee_id)2220    def test_rules_apply_api_include_children(self):2221        sub_pool_id = self.sub_pools[0]['id']2222        code, sub_sub_pool = self.client.pool_create(2223            self.org_id,2224            {'name': "sub_sub_pool",2225             'parent_id': sub_pool_id})2226        self.assertEqual(code, 201)2227        sub_sub_pool_id = sub_sub_pool['id']2228        # root pool2229        resources2 = self._create_resources(2230            ['my_2', 'm3'], pool_id=self.org_pool_id,2231            employee_id=self.employee['id'])2232        # sub pool2233        resources3 = self._create_resources(2234            ['my_3', 'm4'], pool_id=sub_pool_id,2235            employee_id=self.employee['id'])2236        # sub sub pool2237        resources4 = self._create_resources(2238            ['my_4', 'm5'], pool_id=sub_sub_pool_id,2239            employee_id=self.employee['id'])2240        conditions = [2241            {"type": "name_starts_with", "meta_info": "my_"}2242        ]2243        self._create_rule('rule1', conditions=conditions, set_allowed=True)2244        # only processing resources from root2245        code, resp = self.client.rules_apply(self.org_id,2246                                             pool_id=self.org_pool_id)2247        self.assertEqual(code, 201)2248        self.assertEqual(resp['processed'], 2)2249        self.assertEqual(resp['updated_assignments'], 1)2250        self.verify_assignment(resources2[0]['id'], self.org_pool_id,2251                               self.employee['id'])2252        self.verify_assignment(2253            resources2[1]['id'], self.cloud_acc_rule['pool_id'],2254            self.employee['id'])2255        # processing resources from sub and sub sub2256        code, resp = self.client.rules_apply(2257            self.org_id, pool_id=sub_pool_id, include_children=True)2258        self.assertEqual(code, 201)2259        self.assertEqual(resp['processed'], 4)2260        self.assertEqual(resp['updated_assignments'], 4)2261        self.verify_assignment(resources3[0]['id'], self.org_pool_id,2262                               self.employee['id'])2263        self.verify_assignment(2264            resources3[1]['id'], self.cloud_acc_rule['pool_id'],2265            self.employee['id'])2266        self.verify_assignment(resources4[0]['id'], self.org_pool_id,2267                               self.employee['id'])2268        self.verify_assignment(2269            resources4[1]['id'], self.cloud_acc_rule['pool_id'],2270            self.employee['id'])2271    def test_reapply_deleted_rule(self):2272        self._create_resources(['my_1'])2273        rule = self._create_rule('rule1', conditions=[2274            {"type": "name_starts_with", "meta_info": "m"}2275        ], set_allowed=True)2276        code, _ = self.client.rules_apply(self.org_id,2277                                          pool_id=self.org_pool_id)2278        self.assertEqual(code, 201)2279        self.client.rule_delete(rule['id'])2280        rule2 = self._create_rule('rule2', conditions=[2281            {"type": "name_starts_with", "meta_info": "my_"}2282        ], pool_id=self.sub_pools_ids[0], set_allowed=True)2283        m_activities_publish = patch(2284            'rest_api_server.controllers.base.'2285            'BaseController.publish_activities_task').start()2286        code, _ = self.client.rules_apply(2287            self.org_id, pool_id=self.org_pool_id)2288        self.assertEqual(code, 201)2289        m_activities_publish.assert_has_calls([2290            call(self.org_id, self.org_id, 'organization',2291                 'rules_processing_started', ANY,2292                 'organization.rules_processing_started', add_token=True),2293            call(self.org_id, rule2['id'], 'rule',2294                 'rule_applied', ANY, 'rule.rule_applied', add_token=True),2295            call(self.org_id, self.org_id, 'organization',2296                 'rules_processing_completed', ANY,2297                 'organization.rules_processing_completed', add_token=True),2298        ])2299    def test_rules_apply_api_clusters(self):2300        m_activities_publish = patch(2301            'rest_api_server.controllers.base.'2302            'BaseController.publish_activities_task').start()2303        code, c_type = self.client.cluster_type_create(2304            self.org_id, {'name': 'c_type', 'tag_key': 'tn'})2305        self.assertEqual(code, 201)2306        resources = self._create_resources(['my_1', 'm2'], tags={'tn': 'tv'})2307        conditions = [2308            {"type": "tag_exists", "meta_info": "tn"}2309        ]2310        rule = self._create_rule('rule1', conditions=conditions,2311                                 set_allowed=True)2312        code, resp = self.client.rules_apply(self.org_id, self.org_pool_id)2313        m_activities_publish.assert_has_calls([2314            call(self.org_id, self.org_id, 'organization',2315                 'rules_processing_started', ANY,2316                 'organization.rules_processing_started', add_token=True),2317            call(self.org_id, rule['id'], 'rule',2318                 'rule_applied', ANY, 'rule.rule_applied', add_token=True),2319            call(self.org_id, self.org_id, 'organization',2320                 'rules_processing_completed', ANY,2321                 'organization.rules_processing_completed', add_token=True),2322        ])2323        self.assertEqual(code, 201)2324        self.assertEqual(resp['processed'], 1)...fuzzy_object.py
Source:fuzzy_object.py  
...159            self.sim = None160            return161        if fuzzy_type == FuzzyType.BOTH:162            rules = []163            rules.append( self._create_rule("N", "N", "N") )164            rules.append( self._create_rule("N", "Z", "N") )165            rules.append( self._create_rule("N", "P", "Z") )166            rules.append( self._create_rule("Z", "N", "N") )167            rules.append( self._create_rule("Z", "Z", "Z") )168            rules.append( self._create_rule("Z", "P", "P") )169            rules.append( self._create_rule("P", "N", "Z") )170            rules.append( self._create_rule("P", "Z", "P") )171            rules.append( self._create_rule("P", "P", "P") )172#             rules.append( self._create_rule("NB", "NB", "NB") )173#             rules.append( self._create_rule("NB", "NS", "NB") )174#             rules.append( self._create_rule("NB",  "Z", "NS") )175#             rules.append( self._create_rule("NB", "PS", "NS") )176#             rules.append( self._create_rule("NB", "PB",  "Z") )177#178#             rules.append( self._create_rule("NS", "NB", "NB") )179#             rules.append( self._create_rule("NS", "NS", "NS") )180#             rules.append( self._create_rule("NS",  "Z", "NS") )181#             rules.append( self._create_rule("NS", "PS",  "Z") )182#             rules.append( self._create_rule("NS", "PB", "PS") )183#184#             rules.append( self._create_rule("Z", "NB", "NS") )185#             rules.append( self._create_rule("Z", "NS", "NS") )186#             rules.append( self._create_rule("Z",  "Z",  "Z") )187#             rules.append( self._create_rule("Z", "PS", "PS") )188#             rules.append( self._create_rule("Z", "PB", "PS") )189#190#             rules.append( self._create_rule("PS", "NB", "NS") )191#             rules.append( self._create_rule("PS", "NS",  "Z") )192#             rules.append( self._create_rule("PS",  "Z", "PS") )193#             rules.append( self._create_rule("PS", "PS", "PS") )194#             rules.append( self._create_rule("PS", "PB", "PB") )195#196#             rules.append( self._create_rule("PB", "NB",  "Z") )197#             rules.append( self._create_rule("PB", "NS", "PS") )198#             rules.append( self._create_rule("PB",  "Z", "PS") )199#             rules.append( self._create_rule("PB", "PS", "PB") )200#             rules.append( self._create_rule("PB", "PB", "PB") )201            self.output_ctrl = ctrl.ControlSystem( rules )202            self.sim = ctrl.ControlSystemSimulation(self.output_ctrl)203#             self._plots.request_plot( self.output_ctrl, "rules" )204            return205        if fuzzy_type == FuzzyType.ERR:206            rules = []207            rules.append( ctrl.Rule(self.ant_err["N"], self.con_output["N"]) )208            rules.append( ctrl.Rule(self.ant_err["Z"], self.con_output["Z"]) )209            rules.append( ctrl.Rule(self.ant_err["P"], self.con_output["P"]) )210#             rules.append( ctrl.Rule(self.ant_err["NB"], self.con_output["NB"]) )211#             rules.append( ctrl.Rule(self.ant_err["NS"], self.con_output["NS"]) )212#             rules.append( ctrl.Rule(self.ant_err[ "Z"], self.con_output[ "Z"]) )213#             rules.append( ctrl.Rule(self.ant_err["PS"], self.con_output["PS"]) )214#             rules.append( ctrl.Rule(self.ant_err["PB"], self.con_output["PB"]) )215            self.output_ctrl = ctrl.ControlSystem( rules )216            self.sim = ctrl.ControlSystemSimulation(self.output_ctrl)217#             self._plots.request_plot( self.output_ctrl, "rules" )218            return219        if fuzzy_type == FuzzyType.DERR:220            rules = []221            rules.append( ctrl.Rule(self.ant_derr["N"], self.con_output["N"]) )222            rules.append( ctrl.Rule(self.ant_derr["Z"], self.con_output["Z"]) )223            rules.append( ctrl.Rule(self.ant_derr["P"], self.con_output["P"]) )224#             rules.append( ctrl.Rule(self.ant_derr["NB"], self.con_output["NB"]) )225#             rules.append( ctrl.Rule(self.ant_derr["NS"], self.con_output["NS"]) )226#             rules.append( ctrl.Rule(self.ant_derr[ "Z"], self.con_output[ "Z"]) )227#             rules.append( ctrl.Rule(self.ant_derr["PS"], self.con_output["PS"]) )228#             rules.append( ctrl.Rule(self.ant_derr["PB"], self.con_output["PB"]) )229            self.output_ctrl = ctrl.ControlSystem( rules )230            self.sim = ctrl.ControlSystemSimulation(self.output_ctrl)231#             self._plots.request_plot( self.output_ctrl, "rules" )232            return233        self.sim = None234        raise ValueError('Invalid state: ' + fuzzy_type)235    def reset_state(self):236        self.error = Error()237    @synchronized238    def set_err_range(self, value):239        absval = abs(value)240        if absval < 0.1:241            self.ant_err = None242            return243        invert = False244        if value < 0.0:245            invert = True246        self.ant_err = ctrl.Antecedent(np.arange(-absval, absval, 1), 'err')247        self.ant_err.automf(names=["N", "Z", "P"], invert=invert)248        ##self.ant_err.automf(names=["NB", "NS", "Z", "PS", "PB"], invert=invert)249        self._plots.request_plot( self.ant_err, "err" )250    @synchronized251    def set_derr_range(self, value):252        absval = abs(value)253        if absval < 0.1:254            self.ant_derr = None255            return256        invert = False257        if value < 0.0:258            invert = True259        self.ant_derr = ctrl.Antecedent(np.arange(-absval, absval, 1), 'derr')260        self.ant_derr.automf(names=["N", "Z", "P"], invert=invert)261        ##self.ant_derr.automf(names=["NB", "NS", "Z", "PS", "PB"], invert=invert)262        self._plots.request_plot( self.ant_derr, "derr" )263    @synchronized264    def set_output_range(self, value):265        absval = abs(value)266        if absval < 0.1:267            self.con_output = None268            return269        invert = False270        if value < 0.0:271            invert = True272        self.con_output = ctrl.Consequent(np.arange(-absval, absval, 1), 'output')273        self.con_output.automf(names=["N", "Z", "P"], invert=invert)274        ##self.con_output.automf(names=["NB", "NS", "Z", "PS", "PB"], invert=invert)275        self._plots.request_plot( self.con_output, "output" )276    @synchronized277    def compute(self, valueInput):278        fuzzy_type = self.type()279        if fuzzy_type == FuzzyType.INVALID:280            self.error.calculate(valueInput)281            return 0282        err = self.error.calculate(valueInput)283        if fuzzy_type == FuzzyType.BOTH or fuzzy_type == FuzzyType.ERR:284            self.sim.input['err'] = err[0]285        if fuzzy_type == FuzzyType.BOTH or fuzzy_type == FuzzyType.DERR:286            self.sim.input['derr'] = err[1]287        self.sim.compute()288        self._plots.process_requests()289        output = self.sim.output['output']290        return output291    def close(self):292        self._plots.close_all()293    def _create_rule(self, err, derr, out):294        rule = ctrl.Rule(self.ant_err[err] | self.ant_derr[derr], self.con_output[out])295        return rule296class FuzzyObject():297    def __init__(self, controller_name, err_param=90, derr_param=30, output_param=20):298        self.controller_name = controller_name299        self.fuzzy = Fuzzy(err_param, derr_param, output_param)300        rospy.Subscriber("/self_balancer/" + self.controller_name + "/err_max", Float64, self._err_callback)301        rospy.Subscriber("/self_balancer/" + self.controller_name + "/derr_max", Float64, self._derr_callback)302        rospy.Subscriber("/self_balancer/" + self.controller_name + "/output_max", Float64, self._output_callback)303        self.output_pub = rospy.Publisher("/self_balancer/" + self.controller_name + "/output", Float64, queue_size=10)304        self.err_pub = rospy.Publisher("/self_balancer/" + self.controller_name + "/err", Float64, queue_size=10)305        self.derr_pub = rospy.Publisher("/self_balancer/" + self.controller_name + "/derr", Float64, queue_size=10)306    def reset_state(self):307        rospy.loginfo("resetting Fuzzy")...test_rules.py
Source:test_rules.py  
2from tracpro.test.cases import TracProTest3from .. import rules4from . import factories5class TestGetCategory(TracProTest):6    def _create_rule(self, category):7        return {8            'category': category,9            'test': {},10        }11    def test_get_category_direct(self):12        """Return rule['category'] if it is not a dictionary."""13        rule = self._create_rule('cats')14        self.assertEqual(rules.get_category(rule), "cats")15    def test_get_category_base(self):16        """get_category returns the base category name, if available."""17        rule = self._create_rule({'base': 'base', 'eng': 'eng'})18        self.assertEqual(rules.get_category(rule), "base")19    def test_get_category_other(self):20        """get_category returns the first available translation."""21        rule = self._create_rule({'eng': 'eng'})22        self.assertEqual(rules.get_category(rule), "eng")23class TestGetAllCategories(TracProTest):24    def _create_rule(self, category):25        return {26            'category': {'base': category},27            'test': {},28        }29    def test_get_all_no_rules(self):30        question = factories.Question(rules=[])31        self.assertEqual(rules.get_all_categories(question), ["Other"])32    def test_get_all(self):33        question = factories.Question(rules=[34            self._create_rule('a'),35            self._create_rule('b'),36        ])37        self.assertEqual(38            rules.get_all_categories(question),39            ['a', 'b', 'Other'])40    def test_get_all__include_other(self):41        """Ensure that "Other" is at the end of the list."""42        question = factories.Question(rules=[43            self._create_rule('a'),44            self._create_rule('Other'),45            self._create_rule('b'),46        ])47        self.assertEqual(48            rules.get_all_categories(question),49            ['a', 'b', 'Other'])50    def test_get_all_with_answers(self):51        """Include Answer categories that aren't in the rules."""52        question = factories.Question(rules=[53            self._create_rule('a'),54            self._create_rule('b'),55        ])56        factories.Answer(question=question, category='apple')57        factories.Answer(question=question, category='a')58        self.assertEqual(59            rules.get_all_categories(question, answers=question.answers.all()),60            ['a', 'b', 'apple', 'Other'])61    def test_duplicates(self):62        question = factories.Question(rules=[63            self._create_rule('b'),64            self._create_rule('a'),65            self._create_rule('a'),66            self._create_rule('b'),67        ])68        self.assertEqual(69            rules.get_all_categories(question),70            ['b', 'a', 'Other'])71class CheckRuleTestBase(object):72    def call(self, test):73        val, kwargs = test74        return self.rule(val, **kwargs)75    def test_false(self):76        unexpected_successes = []77        for test in self.false_tests:78            if self.call(test) is not False:79                unexpected_successes.append(test)80        if unexpected_successes:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
