Best Python code snippet using localstack_python
test_agnostic.py
Source:test_agnostic.py  
...57        def check_equal(actual, correct):58            e = helper.datalog_equal(actual, correct)59            self.assertTrue(e)60        run = agnostic.Runtime()61        run.create_policy('th1')62        run.create_policy('th2')63        events1 = [compile.Event(formula=x, insert=True, target='th1')64                   for x in helper.str2pol("p(1) p(2) q(1) q(3)")]65        events2 = [compile.Event(formula=x, insert=True, target='th2')66                   for x in helper.str2pol("r(1) r(2) t(1) t(4)")]67        run.update(events1 + events2)68        check_equal(run.select('p(x)', 'th1'), 'p(1) p(2)')69        check_equal(run.select('q(x)', 'th1'), 'q(1) q(3)')70        check_equal(run.select('r(x)', 'th2'), 'r(1) r(2)')71        check_equal(run.select('t(x)', 'th2'), 't(1) t(4)')72    def test_initialize_tables(self):73        """Test initialize_tables() functionality of agnostic."""74        run = agnostic.Runtime()75        run.create_policy('test')76        run.insert('p(1) p(2)')77        facts = [compile.Fact('p', (3,)), compile.Fact('p', (4,))]78        run.initialize_tables(['p'], facts)79        e = helper.datalog_equal(run.select('p(x)'), 'p(3) p(4)')80        self.assertTrue(e)81    def test_single_policy(self):82        """Test ability to create/delete single policies."""83        # single policy84        run = agnostic.Runtime()85        original = run.policy_names()86        run.create_policy('test1')87        run.insert('p(x) :- q(x)', 'test1')88        run.insert('q(1)', 'test1')89        self.assertEqual(90            run.select('p(x)', 'test1'), 'p(1)', 'Policy creation')91        self.assertEqual(92            run.select('p(x)', 'test1'), 'p(1)', 'Policy creation')93        run.delete_policy('test1')94        self.assertEqual(95            set(run.policy_names()), set(original), 'Policy deletion')96    def test_multi_policy(self):97        """Test ability to create/delete multiple policies."""98        # multiple policies99        run = agnostic.Runtime()100        original = run.policy_names()101        run.create_policy('test2')102        run.create_policy('test3')103        self.assertEqual(104            set(run.policy_names()),105            set(original + ['test2', 'test3']),106            'Multi policy creation')107        run.delete_policy('test2')108        run.create_policy('test4')109        self.assertEqual(110            set(run.policy_names()),111            set(original + ['test3', 'test4']),112            'Multiple policy deletion')113        run.insert('p(x) :- q(x)  q(1)', 'test4')114        self.assertEqual(115            run.select('p(x)', 'test4'),116            'p(1)',117            'Multipolicy deletion select')118    def test_cross_policy_rule(self):119        """Test rule that refer to table from another policy."""120        run = agnostic.Runtime()121        run.create_policy('test1')122        run.create_policy('test2')123        run.create_policy('test3')124        run.insert(125            'p(x) :- test1:q(x),test2:q(x),test3:q(x),q(x)  q(1) q(2) q(3)',126            'test3')127        run.insert('q(1)', 'test1')128        run.insert('q(1) q(2)', 'test2')129        self.assertEqual(130            run.select('p(x)', 'test3'),131            'p(1)',132            'Cross-policy rule select')133    def test_policy_types(self):134        """Test types for multiple policies."""135        # policy types136        run = agnostic.Runtime()137        run.create_policy('test1', kind=datalog_base.NONRECURSIVE_POLICY_TYPE)138        self.assertIsInstance(run.policy_object('test1'),139                              nonrecursive.NonrecursiveRuleTheory,140                              'Nonrecursive policy addition')141        run.create_policy('test2', kind=datalog_base.ACTION_POLICY_TYPE)142        self.assertIsInstance(run.policy_object('test2'),143                              nonrecursive.ActionTheory,144                              'Action policy addition')145        run.create_policy('test3', kind=datalog_base.DATABASE_POLICY_TYPE)146        self.assertIsInstance(run.policy_object('test3'),147                              database.Database,148                              'Database policy addition')149        run.create_policy('test4', kind=datalog_base.MATERIALIZED_POLICY_TYPE)150        self.assertIsInstance(run.policy_object('test4'),151                              materialized.MaterializedViewTheory,152                              'Materialized policy addition')153    def test_policy_errors(self):154        """Test errors for multiple policies."""155        # errors156        run = agnostic.Runtime()157        run.create_policy('existent')158        self.assertRaises(KeyError, run.create_policy, 'existent')159        self.assertRaises(KeyError, run.delete_policy, 'nonexistent')160        self.assertRaises(KeyError, run.policy_object, 'nonexistent')161    def test_wrong_arity_index(self):162        run = agnostic.Runtime()163        run.create_policy('test1')164        run.insert('p(x) :- r(x), q(y, x)')165        run.insert('r(1)')166        run.insert('q(1,1)')167        # run query first to build index168        self.assertTrue(helper.datalog_equal(run.select('p(x)'), 'p(1)'))169        # next insert causes an exceptions since the thing we indexed on170        #   doesn't exist171        permitted, errs = run.insert('q(5)')172        self.assertFalse(permitted)173        self.assertEqual(len(errs), 1)174        self.assertIsInstance(errs[0], exception.PolicyException)175        # double-check that the error didn't result in an inconsistent state176        self.assertEqual(run.select('q(5)'), '')177    def test_get_tablename(self):178        run = agnostic.DseRuntime('dse')179        run.synchronizer = mock.MagicMock()180        run.create_policy('test')181        run.insert('p(x) :- q(x)')182        run.insert('q(x) :- r(x)')183        run.insert('execute[nova:disconnect(x, y)] :- s(x, y)')184        tables = run.get_tablename('test', 'p')185        self.assertEqual({'p'}, set(tables))186        tables = run.get_tablename('test', 't')187        self.assertIsNone(tables)188        tables = run.get_tablenames('test')189        self.assertEqual({'p', 'q', 'r', 's'}, set(tables))190        tables = run.get_tablename('test', 'nova:disconnect')191        self.assertIsNone(tables)192    def test_tablenames(self):193        run = agnostic.Runtime()194        run.create_policy('test')195        run.insert('p(x) :- q(x,y)')196        run.insert('q(x,y) :- r(x,y)')197        run.insert('t(x) :- q(x,y), r(x,z), equal(y, z)')198        run.insert('execute[nova:disconnect(x, y)] :- s(x, y)')199        tables = run.tablenames()200        self.assertEqual({'p', 'q', 'r', 's', 't', 'nova:disconnect'},201                         set(tables))202        tables = run.tablenames(include_builtin=True)203        self.assertEqual({'p', 'q', 'r', 's', 't', 'nova:disconnect', 'equal'},204                         set(tables))205        tables = run.tablenames(body_only=True)206        self.assertEqual({'q', 'r', 's'}, set(tables))207        tables = run.tablenames(include_modal=False)208        self.assertEqual({'p', 'q', 'r', 's', 't'}, set(tables))209    @mock.patch.object(db_policy_rules, 'add_policy')210    def test_persistent_create_policy(self, mock_add):211        run = agnostic.Runtime()212        policy_name = 'invalid-table-name'213        self.assertRaises(exception.PolicyException,214                          run.persistent_create_policy,215                          policy_name)216        self.assertNotIn(policy_name, run.policy_names())217    @mock.patch.object(db_policy_rules, 'add_policy', side_effect=Exception())218    def test_persistent_create_policy_with_db_exception(self, mock_add):219        run = agnostic.Runtime()220        with mock.patch.object(run, 'delete_policy') as mock_delete:221            run.synchronizer = mock.MagicMock()222            policy_name = 'test_policy'223            self.assertRaises(exception.PolicyException,224                              run.persistent_create_policy,225                              policy_name)226            mock_add.assert_called_once_with(mock.ANY,227                                             policy_name,228                                             policy_name[:5],229                                             mock.ANY,230                                             'user',231                                             'nonrecursive')232            # mock_delete.assert_called_once_with(policy_name)233            self.assertFalse(mock_delete.called)234            self.assertFalse(run.synchronizer.sync_one_policy.called)235            self.assertNotIn('test_policy', run.policy_names())236    def test_tablenames_theory_name(self):237        run = agnostic.Runtime()238        run.create_policy('test')239        run.create_policy('test2')240        run.insert('p(x) :- q(x)', 'test')241        run.insert('r(x) :- s(x)', 'test2')242        tables = run.tablenames()243        self.assertEqual(set(tables), set(['p', 'q', 'r', 's']))244        tables = run.tablenames(theory_name='test')245        self.assertEqual(set(tables), set(['p', 'q']))246class TestArity(base.TestCase):247    def test_same_table_diff_policies(self):248        run = agnostic.Runtime()249        run.create_policy('alice')250        run.create_policy('bob')251        run.insert('p(x) :- q(x, y)', 'alice')252        run.insert('p(x, y) :- r(x, y, z)', 'bob')253        self.assertEqual(1, run.arity('p', 'alice'))254        self.assertEqual(2, run.arity('p', 'bob'))255    def test_complex_table(self):256        run = agnostic.Runtime()257        run.create_policy('alice')258        run.create_policy('bob')259        run.insert('p(x) :- q(x, y)', 'alice')260        run.insert('p(x, y) :- r(x, y, z)', 'bob')261        self.assertEqual(1, run.arity('alice:p', 'bob'))262        self.assertEqual(1, run.arity('alice:p', 'alice'))263    def test_modals(self):264        run = agnostic.Runtime()265        run.create_policy('alice')266        run.insert('execute[nova:p(x)] :- q(x, y)', 'alice')267        self.assertEqual(1, run.arity('nova:p', 'alice', 'execute'))268class TestTriggerRegistry(base.TestCase):269    def setUp(self):270        super(TestTriggerRegistry, self).setUp()271        self.f = lambda tbl, old, new: old272    def test_trigger(self):273        trigger1 = agnostic.Trigger('table', 'policy', self.f)274        trigger2 = agnostic.Trigger('table', 'policy', self.f)275        trigger3 = agnostic.Trigger('table2', 'policy', self.f)276        trigger4 = agnostic.Trigger('table', 'policy', lambda x: x)277        s = set()278        s.add(trigger1)279        s.add(trigger2)280        s.add(trigger3)281        s.add(trigger4)282        self.assertEqual(4, len(s))283        s.discard(trigger1)284        self.assertEqual(3, len(s))285        s.discard(trigger2)286        self.assertEqual(2, len(s))287        s.discard(trigger3)288        self.assertEqual(1, len(s))289        s.discard(trigger4)290        self.assertEqual(0, len(s))291    def test_register(self):292        g = compile.RuleDependencyGraph()293        reg = agnostic.TriggerRegistry(g)294        # register295        p_trigger = reg.register_table('p', 'alice', self.f)296        triggers = reg.relevant_triggers(['alice:p'])297        self.assertEqual(triggers, set([p_trigger]))298        # register 2nd table299        q_trigger = reg.register_table('q', 'alice', self.f)300        p_triggers = reg.relevant_triggers(['alice:p'])301        self.assertEqual(p_triggers, set([p_trigger]))302        q_triggers = reg.relevant_triggers(['alice:q'])303        self.assertEqual(q_triggers, set([q_trigger]))304        # register again with table p305        p2_trigger = reg.register_table('p', 'alice', self.f)306        p_triggers = reg.relevant_triggers(['alice:p'])307        self.assertEqual(p_triggers, set([p_trigger, p2_trigger]))308        q_triggers = reg.relevant_triggers(['alice:q'])309        self.assertEqual(q_triggers, set([q_trigger]))310    def test_unregister(self):311        g = compile.RuleDependencyGraph()312        reg = agnostic.TriggerRegistry(g)313        p_trigger = reg.register_table('p', 'alice', self.f)314        q_trigger = reg.register_table('q', 'alice', self.f)315        self.assertEqual(reg.relevant_triggers(['alice:p']),316                         set([p_trigger]))317        self.assertEqual(reg.relevant_triggers(['alice:q']),318                         set([q_trigger]))319        # unregister p320        reg.unregister(p_trigger)321        self.assertEqual(reg.relevant_triggers(['alice:p']), set())322        self.assertEqual(reg.relevant_triggers(['alice:q']),323                         set([q_trigger]))324        # unregister q325        reg.unregister(q_trigger)326        self.assertEqual(reg.relevant_triggers(['alice:p']), set())327        self.assertEqual(reg.relevant_triggers(['alice:q']), set())328        # unregister nonexistent trigger329        self.assertRaises(KeyError, reg.unregister, p_trigger)330        self.assertEqual(reg.relevant_triggers(['alice:p']), set())331        self.assertEqual(reg.relevant_triggers(['alice:q']), set())332    def test_basic_dependency(self):333        g = compile.RuleDependencyGraph()334        reg = agnostic.TriggerRegistry(g)335        g.formula_insert(compile.parse1('p(x) :- q(x)'), 'alice')336        # register p337        p_trigger = reg.register_table('p', 'alice', self.f)338        self.assertEqual(reg.relevant_triggers(['alice:q']), set([p_trigger]))339        self.assertEqual(reg.relevant_triggers(['alice:p']), set([p_trigger]))340        # register q341        q_trigger = reg.register_table('q', 'alice', self.f)342        self.assertEqual(reg.relevant_triggers(['alice:q']),343                         set([p_trigger, q_trigger]))344        self.assertEqual(reg.relevant_triggers(['alice:p']),345                         set([p_trigger]))346    def test_complex_dependency(self):347        g = compile.RuleDependencyGraph()348        reg = agnostic.TriggerRegistry(g)349        g.formula_insert(compile.parse1('p(x) :- q(x)'), 'alice')350        g.formula_insert(compile.parse1('q(x) :- r(x), s(x)'), 'alice')351        g.formula_insert(compile.parse1('r(x) :- t(x, y), u(y)'), 'alice')352        g.formula_insert(compile.parse1('separate(x) :- separate2(x)'),353                         'alice')354        g.formula_insert(compile.parse1('notrig(x) :- notrig2(x)'), 'alice')355        p_trigger = reg.register_table('p', 'alice', self.f)356        sep_trigger = reg.register_table('separate', 'alice', self.f)357        # individual tables358        self.assertEqual(reg.relevant_triggers(['alice:p']), set([p_trigger]))359        self.assertEqual(reg.relevant_triggers(['alice:q']), set([p_trigger]))360        self.assertEqual(reg.relevant_triggers(['alice:r']), set([p_trigger]))361        self.assertEqual(reg.relevant_triggers(['alice:s']), set([p_trigger]))362        self.assertEqual(reg.relevant_triggers(['alice:t']), set([p_trigger]))363        self.assertEqual(reg.relevant_triggers(['alice:u']), set([p_trigger]))364        self.assertEqual(reg.relevant_triggers(['alice:notrig']), set())365        self.assertEqual(reg.relevant_triggers(['alice:notrig2']), set([]))366        self.assertEqual(reg.relevant_triggers(['alice:separate']),367                         set([sep_trigger]))368        self.assertEqual(reg.relevant_triggers(['alice:separate2']),369                         set([sep_trigger]))370        # groups of tables371        self.assertEqual(reg.relevant_triggers(['alice:p', 'alice:q']),372                         set([p_trigger]))373        self.assertEqual(reg.relevant_triggers(['alice:separate', 'alice:p']),374                         set([p_trigger, sep_trigger]))375        self.assertEqual(reg.relevant_triggers(['alice:notrig', 'alice:p']),376                         set([p_trigger]))377        # events: data378        event = compile.Event(compile.parse1('q(1)'), target='alice')379        self.assertEqual(reg.relevant_triggers([event]), set([p_trigger]))380        event = compile.Event(compile.parse1('u(1)'), target='alice')381        self.assertEqual(reg.relevant_triggers([event]), set([p_trigger]))382        event = compile.Event(compile.parse1('separate2(1)'), target='alice')383        self.assertEqual(reg.relevant_triggers([event]), set([sep_trigger]))384        event = compile.Event(compile.parse1('notrig2(1)'), target='alice')385        self.assertEqual(reg.relevant_triggers([event]), set([]))386        # events: rules387        event = compile.Event(compile.parse1('separate(x) :- q(x)'),388                              target='alice')389        self.assertEqual(reg.relevant_triggers([event]), set([sep_trigger]))390        event = compile.Event(compile.parse1('notrig(x) :- q(x)'),391                              target='alice')392        self.assertEqual(reg.relevant_triggers([event]), set([]))393        event = compile.Event(compile.parse1('r(x) :- q(x)'), target='alice')394        self.assertEqual(reg.relevant_triggers([event]), set([p_trigger]))395        # events: multiple rules and data396        event1 = compile.Event(compile.parse1('r(x) :- q(x)'), target='alice')397        event2 = compile.Event(compile.parse1('separate2(1)'), target='alice')398        self.assertEqual(reg.relevant_triggers([event1, event2]),399                         set([p_trigger, sep_trigger]))400        event1 = compile.Event(compile.parse1('r(x) :- q(x)'), target='alice')401        event2 = compile.Event(compile.parse1('notrigger2(1)'), target='alice')402        self.assertEqual(reg.relevant_triggers([event1, event2]),403                         set([p_trigger]))404    def test_triggers_by_table(self):405        t1 = agnostic.Trigger('p', 'alice', lambda x: x)406        t2 = agnostic.Trigger('p', 'alice', lambda x, y: x)407        t3 = agnostic.Trigger('q', 'alice', lambda x: x)408        triggers = [t1, t2, t3]409        table_triggers = agnostic.TriggerRegistry.triggers_by_table(triggers)410        self.assertEqual(2, len(table_triggers))411        self.assertEqual(set(table_triggers[('p', 'alice', None)]),412                         set([t1, t2]))413        self.assertEqual(set(table_triggers[('q', 'alice', None)]),414                         set([t3]))415    def test_modals(self):416        g = compile.RuleDependencyGraph()417        reg = agnostic.TriggerRegistry(g)418        # register419        p_trigger = reg.register_table('p', 'alice', self.f, modal='exec')420        triggers = reg.relevant_triggers(['alice:p'])421        self.assertEqual(triggers, set([p_trigger]))422        # register 2nd table423        q_trigger = reg.register_table('q', 'alice', self.f)424        p_triggers = reg.relevant_triggers(['alice:p'])425        self.assertEqual(p_triggers, set([p_trigger]))426        q_triggers = reg.relevant_triggers(['alice:q'])427        self.assertEqual(q_triggers, set([q_trigger]))428class TestTriggers(base.TestCase):429    class MyObject(object):430        """A class with methods that have side-effects."""431        def __init__(self):432            self.value = 0433            self.equals = False434        def increment(self):435            """Used for counting number of times function invoked."""436            self.value += 1437        def equal(self, realold, realnew, old, new):438            """Used for checking if function is invoked with correct args."""439            self.equals = (realold == old and realnew == new)440    def test_empty(self):441        obj = self.MyObject()442        run = agnostic.Runtime()443        run.create_policy('test')444        run.register_trigger('p', lambda tbl, old, new: obj.increment())445        run.insert('p(1)')446        self.assertEqual(1, obj.value)447    def test_empty2(self):448        obj = self.MyObject()449        run = agnostic.Runtime()450        run.create_policy('test')451        run.insert('p(1)')452        run.register_trigger('p', lambda tbl, old, new: obj.increment())453        run.delete('p(1)')454        self.assertEqual(1, obj.value)455    def test_empty3(self):456        obj = self.MyObject()457        run = agnostic.Runtime()458        run.create_policy('test')459        run.insert('p(1)')460        run.delete('p(1)')461        run.register_trigger('p', lambda tbl, old, new: obj.increment())462        run.delete('p(1)')463        self.assertEqual(0, obj.value)464    def test_nochange(self):465        obj = self.MyObject()466        run = agnostic.Runtime()467        run.create_policy('test')468        run.insert('p(1)')469        run.register_trigger('p', lambda tbl, old, new: obj.increment())470        run.insert('p(1)')471        self.assertEqual(0, obj.value)472    def test_batch_change(self):473        obj = self.MyObject()474        run = agnostic.Runtime()475        run.create_policy('test')476        run.register_trigger('p', lambda tbl, old, new: obj.increment())477        p1 = compile.parse1('p(1)')478        result = run.update([compile.Event(p1, target='test')])479        self.assertTrue(result[0], ("Update failed with errors: " +480                                    ";".join(str(x) for x in result[1])))481        self.assertEqual(1, obj.value)482    def test_dependency(self):483        obj = self.MyObject()484        run = agnostic.Runtime()485        run.create_policy('test')486        run.insert('p(x) :- q(x)')487        run.register_trigger('p', lambda tbl, old, new: obj.increment())488        run.insert('q(1)')489        self.assertEqual(1, obj.value)490    def test_dependency_batch_insert(self):491        obj = self.MyObject()492        run = agnostic.Runtime()493        run.create_policy('test')494        run.register_trigger('p', lambda tbl, old, new: obj.increment())495        run.insert('q(1)   p(x) :- q(x)')496        self.assertEqual(1, obj.value)497    def test_dependency_batch(self):498        obj = self.MyObject()499        run = agnostic.Runtime()500        run.create_policy('test')501        run.insert('p(x) :- q(x)')502        run.register_trigger('p', lambda tbl, old, new: obj.increment())503        rule = compile.parse1('q(x) :- r(x)')504        data = compile.parse1('r(1)')505        run.update([compile.Event(rule, target='test'),506                    compile.Event(data, target='test')])507        self.assertEqual(1, obj.value)508    def test_dependency_batch_delete(self):509        obj = self.MyObject()510        run = agnostic.Runtime()511        run.create_policy('test')512        run.insert('p(x) :- q(x)')513        run.insert('q(x) :- r(x)')514        run.insert('r(1)')515        run.register_trigger('p', lambda tbl, old, new: obj.increment())516        run.delete('q(x) :- r(x)')517        self.assertEqual(1, obj.value)518    def test_multi_dependency(self):519        obj = self.MyObject()520        run = agnostic.Runtime()521        run.create_policy('test')522        run.insert('p(x) :- q(x)')523        run.insert('q(x) :- r(x), s(x)')524        run.insert('s(1)')525        run.register_trigger('p', lambda tbl, old, new: obj.increment())526        run.insert('r(1)')527        self.assertEqual(1, obj.value)528    def test_negation(self):529        obj = self.MyObject()530        run = agnostic.Runtime()531        run.create_policy('test')532        run.insert('p(x) :- q(x), not r(x)')533        run.insert('q(1)')534        run.insert('q(2)')535        run.insert('r(2)')536        run.register_trigger('p', lambda tbl, old, new: obj.increment())537        run.insert('r(1)')538        self.assertEqual(1, obj.value)539        run.register_trigger('p', lambda tbl, old, new: obj.increment())540        run.delete('r(1)')541        self.assertEqual(3, obj.value)542    def test_anti_dependency(self):543        obj = self.MyObject()544        run = agnostic.Runtime()545        run.create_policy('test')546        run.insert('p(x) :- q(x)')547        run.insert('r(1)')548        run.register_trigger('r', lambda tbl, old, new: obj.increment())549        run.insert('q(1)')550        self.assertEqual(0, obj.value)551    def test_old_new_correctness(self):552        obj = self.MyObject()553        run = agnostic.Runtime()554        run.create_policy('test')555        run.insert('p(x) :- q(x)')556        run.insert('q(x) :- r(x), not s(x)')557        run.insert('r(1) r(2) r(3)')558        run.insert('s(2)')559        oldp = set(compile.parse('p(1) p(3)'))560        newp = set(compile.parse('p(1) p(2)'))561        run.register_trigger('p',562                             lambda tbl, old, new:563                             obj.equal(oldp, newp, old, new))564        run.update([compile.Event(compile.parse1('s(3)')),565                    compile.Event(compile.parse1('s(2)'), insert=False)])566        self.assertTrue(obj.equals)567    def test_unregister(self):568        obj = self.MyObject()569        run = agnostic.Runtime()570        run.create_policy('test')571        trigger = run.register_trigger('p',572                                       lambda tbl, old, new: obj.increment())573        run.insert('p(1)')574        self.assertEqual(1, obj.value)575        run.unregister_trigger(trigger)576        self.assertEqual(1, obj.value)577        run.insert('p(2)')578        self.assertEqual(1, obj.value)579        self.assertRaises(KeyError, run.unregister_trigger, trigger)580        self.assertEqual(1, obj.value)581    def test_sequence(self):582        obj = self.MyObject()583        run = agnostic.Runtime()584        run.create_policy('test')585        run.register_trigger('p', lambda tbl, old, new: obj.increment())586        run.insert('p(x) :- q(x)')587        run.insert('q(1)')588        self.assertEqual(1, obj.value)589    def test_delete_data(self):590        obj = self.MyObject()591        run = agnostic.Runtime()592        run.create_policy('test')593        run.register_trigger('p', lambda tbl, old, new: obj.increment())594        run.insert('p(x) :- q(x, y), equal(y, 1)')595        run.insert('q(1, 1)')596        self.assertEqual(1, obj.value)597        run.delete('q(1, 1)')598        self.assertEqual(2, obj.value)599    def test_multi_policies(self):600        obj = self.MyObject()601        run = agnostic.Runtime()602        run.debug_mode()603        run.create_policy('alice')604        run.create_policy('bob')605        run.register_trigger('p',606                             lambda tbl, old, new: obj.increment(), 'alice')607        run.insert('p(x) :- bob:q(x)', target='alice')608        run.insert('q(1)', target='bob')609        self.assertEqual(1, obj.value)610        run.delete('q(1)', target='bob')611        self.assertEqual(2, obj.value)612    def test_modal(self):613        obj = self.MyObject()614        run = agnostic.Runtime()615        run.debug_mode()616        run.create_policy('alice')617        run.register_trigger('p', lambda tbl, old, new:618                             obj.increment(), 'alice', 'execute')619        run.insert('execute[p(x)] :- q(x)')620        self.assertEqual(0, obj.value)621        run.insert('q(1)')622        self.assertEqual(1, obj.value)623        run.insert('q(2)')624        self.assertEqual(2, obj.value)625    def test_initialize(self):626        obj = self.MyObject()627        run = agnostic.Runtime()628        run.debug_mode()629        run.create_policy('alice')630        run.register_trigger('p', lambda tbl, old, new:631                             obj.increment(), 'alice', 'execute')632        run.insert('execute[p(x)] :- q(x)')633        self.assertEqual(obj.value, 0)634        run.initialize_tables(['q'], [compile.Fact('q', [1])], 'alice')635        self.assertEqual(obj.value, 1)636class TestMultipolicyRules(base.TestCase):637    def test_external(self):638        """Test ability to write rules that span multiple policies."""639        # External theory640        run = agnostic.Runtime()641        run.create_policy('test1')642        run.insert('q(1)', target='test1')643        run.insert('q(2)', target='test1')644        run.create_policy('test2')645        run.insert('p(x) :- test1:q(x)', target='test2')646        actual = run.select('p(x)', target='test2')647        e = helper.db_equal('p(1) p(2)', actual)648        self.assertTrue(e, "Basic")649    def test_multi_external(self):650        """Test multiple rules that span multiple policies."""651        run = agnostic.Runtime()652        run.debug_mode()653        run.create_policy('test1')654        run.create_policy('test2')655        run.create_policy('test3')656        run.insert('p(x) :- test2:p(x)', target='test1')657        run.insert('p(x) :- test3:p(x)', target='test1')658        run.insert('p(1)', target='test2')659        run.insert('p(2)', target='test3')660        actual = run.select('p(x)', target='test1')661        e = helper.db_equal(actual, 'p(1) p(2)')662        self.assertTrue(e, "Multiple external rules with multiple policies")663    def test_external_current(self):664        """Test ability to write rules that span multiple policies."""665        # External theory plus current theory666        run = agnostic.Runtime()667        run.create_policy('test1')668        run.insert('q(1)', target='test1')669        run.insert('q(2)', target='test1')670        run.create_policy('test2')671        run.insert('p(x) :- test1:q(x), r(x)', target='test2')672        run.insert('r(1)', target='test2')673        run.insert('r(2)', target='test2')674        actual = run.select('p(x)', target='test2')675        e = helper.db_equal(actual, 'p(1) p(2)')676        self.assertTrue(e, "Mixing external theories with current theory")677    def test_ignore_local(self):678        """Test ability to write rules that span multiple policies."""679        # Local table ignored680        run = agnostic.Runtime()681        run.create_policy('test1')682        run.insert('q(1)', target='test1')683        run.insert('q(2)', target='test1')684        run.create_policy('test2')685        run.insert('p(x) :- test1:q(x), r(x)', target='test2')686        run.insert('q(3)', 'test2')687        run.insert('r(1)', target='test2')688        run.insert('r(2)', target='test2')689        run.insert('r(3)', target='test2')690        actual = run.select('p(x)', target='test2')691        e = helper.db_equal(actual, 'p(1) p(2)')692        self.assertTrue(e, "Local table ignored")693    def test_local(self):694        """Test ability to write rules that span multiple policies."""695        # Local table used696        run = agnostic.Runtime()697        run.create_policy('test1')698        run.insert('q(1)', target='test1')699        run.insert('q(2)', target='test1')700        run.create_policy('test2')701        run.insert('p(x) :- test1:q(x), q(x)', target='test2')702        run.insert('q(2)', 'test2')703        actual = run.select('p(x)', target='test2')704        e = helper.db_equal(actual, 'p(2)')705        self.assertTrue(e, "Local table used")706    def test_multiple_external(self):707        """Test ability to write rules that span multiple policies."""708        # Multiple external theories709        run = agnostic.Runtime()710        run.create_policy('test1')711        run.insert('q(1)', target='test1')712        run.insert('q(2)', target='test1')713        run.insert('q(3)', target='test1')714        run.create_policy('test2')715        run.insert('q(1)', target='test2')716        run.insert('q(2)', target='test2')717        run.insert('q(4)', target='test2')718        run.create_policy('test3')719        run.insert('p(x) :- test1:q(x), test2:q(x)', target='test3')720        actual = run.select('p(x)', target='test3')721        e = helper.db_equal(actual, 'p(1) p(2)')722        self.assertTrue(e, "Multiple external theories")723    def test_multiple_levels_external(self):724        """Test ability to write rules that span multiple policies."""725        # Multiple levels of external theories726        run = agnostic.Runtime()727        run.debug_mode()728        run.create_policy('test1')729        run.insert('p(x) :- test2:q(x), test3:q(x)', target='test1')730        run.insert('s(3) s(1) s(2) s(4)', target='test1')731        run.create_policy('test2')732        run.insert('q(x) :- test4:r(x)', target='test2')733        run.create_policy('test3')734        run.insert('q(x) :- test1:s(x)', target='test3')735        run.create_policy('test4')736        run.insert('r(1)', target='test4')737        run.insert('r(2)', target='test4')738        run.insert('r(5)', target='test4')739        actual = run.select('p(x)', target='test1')740        e = helper.db_equal(actual, 'p(1) p(2)')741        self.assertTrue(e, "Multiple levels of external theories")742    def test_multipolicy_head(self):743        """Test SELECT with different policy in the head."""744        run = agnostic.Runtime()745        run.debug_mode()746        run.create_policy('test1', kind='action')747        run.create_policy('test2', kind='action')748        (permitted, errors) = run.insert('test2:p+(x) :- q(x)', 'test1')749        self.assertTrue(permitted, "modals with policy names must be allowed")750        run.insert('q(1)', 'test1')751        run.insert('p(2)', 'test2')752        actual = run.select('test2:p+(x)', 'test1')753        e = helper.db_equal(actual, 'test2:p+(1)')754        self.assertTrue(e, "Policy name in the head")755    def test_multipolicy_normal_errors(self):756        """Test errors arising from rules in multiple policies."""757        run = agnostic.Runtime()758        run.debug_mode()759        run.create_policy('test1')760        # policy in head of rule761        (permitted, errors) = run.insert('test2:p(x) :- q(x)', 'test1')762        self.assertFalse(permitted)763        self.assertIn("should not reference any policy", str(errors[0]))764        # policy in head of rule with update765        (permitted, errors) = run.insert('test2:p+(x) :- q(x)', 'test1')766        self.assertFalse(permitted)767        self.assertIn("should not reference any policy", str(errors[0]))768        # policy in head of rule with update769        (permitted, errors) = run.insert('test2:p-(x) :- q(x)', 'test1')770        self.assertFalse(permitted)771        self.assertIn("should not reference any policy", str(errors[0]))772        # policy in head of fact773        (permitted, errors) = run.insert('test2:p(1)', 'test1')774        self.assertFalse(permitted)775        self.assertIn("should not reference any policy", str(errors[0]))776        # policy in head of fact777        (permitted, errors) = run.insert('test2:p+(1)', 'test1')778        self.assertFalse(permitted)779        self.assertIn("should not reference any policy", str(errors[0]))780        # policy in head of fact781        (permitted, errors) = run.insert('test2:p-(1)', 'test1')782        self.assertFalse(permitted)783        self.assertIn("should not reference any policy", str(errors[0]))784        # recursion across policies785        run.insert('p(x) :- test2:q(x)', target='test1')786        run.create_policy('test2')787        (permit, errors) = run.insert('q(x) :- test1:p(x)', target='test2')788        self.assertFalse(permit, "Recursion across theories should fail")789        self.assertEqual(len(errors), 1)790        self.assertIn("Rules are recursive", str(errors[0]))791    def test_multipolicy_action_errors(self):792        """Test errors arising from rules in action policies."""793        run = agnostic.Runtime()794        run.debug_mode()795        run.create_policy('test1', kind='action')796        # policy in head of rule797        (permitted, errors) = run.insert('test2:p(x) :- q(x)', 'test1')798        self.assertFalse(permitted)799        self.assertIn("should not reference any policy", str(errors[0]))800        # policy in head of fact801        (permitted, errors) = run.insert('test2:p(1)', 'test1')802        self.assertFalse(permitted)803        self.assertIn("should not reference any policy", str(errors[0]))804        # recursion across policies805        run.insert('p(x) :- test2:q(x)', target='test1')806        run.create_policy('test2')807        (permit, errors) = run.insert('q(x) :- test1:p(x)', target='test2')808        self.assertFalse(permit, "Recursion across theories should fail")809        self.assertEqual(len(errors), 1)810        self.assertIn("Rules are recursive", str(errors[0]))811    def test_dependency_graph_policy_deletion(self):812        run = agnostic.Runtime()813        g = run.global_dependency_graph814        run.create_policy('test')815        rule = 'execute[nova:flavors.delete(id)] :- nova:flavors(id)'816        permitted, changes = run.insert(rule, target='test')817        self.assertTrue(permitted)818        run.create_policy('nova')819        run.insert('flavors(1)', target="nova")820        run.insert('flavors(2)', target="nova")821        run.insert('flavors(3)', target="nova")822        run.insert('flavors(4)', target="nova")823        self.assertEqual(g.dependencies('test:nova:flavors.delete'),824                         set(['nova:flavors', 'test:nova:flavors.delete']))825        run.delete_policy('nova')826        self.assertTrue(g.node_in('nova:flavors'))827        self.assertEqual(g.dependencies('test:nova:flavors.delete'),828                         set(['nova:flavors', 'test:nova:flavors.delete']))829    def test_dependency_graph(self):830        """Test that dependency graph gets updated correctly."""831        run = agnostic.Runtime()832        run.debug_mode()833        g = run.global_dependency_graph834        run.create_policy('test')835        run.insert('p(x) :- q(x), nova:q(x)', target='test')836        self.assertTrue(g.edge_in('test:p', 'nova:q', False))837        self.assertTrue(g.edge_in('test:p', 'test:q', False))838        run.insert('p(x) :- s(x)', target='test')839        self.assertTrue(g.edge_in('test:p', 'nova:q', False))840        self.assertTrue(g.edge_in('test:p', 'test:q', False))841        self.assertTrue(g.edge_in('test:p', 'test:s', False))842        run.insert('q(x) :- nova:r(x)', target='test')843        self.assertTrue(g.edge_in('test:p', 'nova:q', False))844        self.assertTrue(g.edge_in('test:p', 'test:q', False))845        self.assertTrue(g.edge_in('test:p', 'test:s', False))846        self.assertTrue(g.edge_in('test:q', 'nova:r', False))847        run.delete('p(x) :- q(x), nova:q(x)', target='test')848        self.assertTrue(g.edge_in('test:p', 'test:s', False))849        self.assertTrue(g.edge_in('test:q', 'nova:r', False))850        run.update([compile.Event(helper.str2form('p(x) :- q(x), nova:q(x)'),851                                  target='test')])852        self.assertTrue(g.edge_in('test:p', 'nova:q', False))853        self.assertTrue(g.edge_in('test:p', 'test:q', False))854        self.assertTrue(g.edge_in('test:p', 'test:s', False))855        self.assertTrue(g.edge_in('test:q', 'nova:r', False))856    def test_negation(self):857        """Test that negation when applied to a different policy works."""858        run = agnostic.Runtime()859        run.debug_mode()860        run.create_policy('alpha')861        run.create_policy('beta')862        run.insert('p(x) :- beta:q(x), not beta:q(x)', 'alpha')863        run.insert('q(1)', 'beta')864        self.assertEqual(run.select('p(x)', 'alpha'), '')865    def test_built_in(self):866        """Test that built_in function works."""867        run = agnostic.Runtime()868        run.debug_mode()869        run.create_policy('alpha')870        run.create_policy('beta')871        run.create_policy('sigma')872        run.insert('p(x1, x2) :- '873                   'beta:q(x1), sigma:r(x2), not equal(x1, x2)', 'alpha')874        run.insert('q(1)', 'beta')875        run.insert('r(1)', 'sigma')876        run.insert('r(3)', 'sigma')877        self.assertEqual(run.select('p(x1,x2)', 'alpha'), 'p(1, 3)')878    def test_schema_check(self):879        """Test that schema check in multiple policies works."""880        run = agnostic.Runtime()881        run.debug_mode()882        run.create_policy('alpha')883        run.create_policy('beta')884        run.insert('p(x,y) :- beta:q(x,y)', 'alpha')885        permitted, changes = run.insert('q(x) :- r(x)', 'beta')886        self.assertFalse(permitted)887        self.assertEqual(len(changes), 1)888    def test_same_rules(self):889        """Test that same rule insertion can be correctly dealt with."""890        run = agnostic.Runtime()891        run.debug_mode()892        policy = 'alpha'893        run.create_policy(policy)894        rulestr = 'p(x,y) :- q(x,y)'895        rule = compile.parse1(rulestr)896        run.insert(rulestr, policy)897        self.assertIn(rule, run.policy_object(policy))898        self.assertIn(899            rule.head.table.table, run.policy_object(policy).schema)900        run.insert(rulestr, policy)901        run.delete(rulestr, policy)902        self.assertFalse(rule in run.policy_object(policy))903        self.assertFalse(904            rule.head.table.table in run.policy_object(policy).schema)905class TestSelect(base.TestCase):906    def test_no_dups(self):907        run = agnostic.Runtime()908        run.create_policy('test')909        run.insert('p(x) :- q(x)')910        run.insert('p(x) :- r(x)')911        run.insert('q(1)')912        run.insert('r(1)')913        self.assertEqual(run.select('p(x)'), 'p(1)')914class TestPolicyCreationDeletion(base.TestCase):915    def test_policy_creation_after_ref(self):916        """Test ability to write rules that span multiple policies."""917        # Local table used918        run = agnostic.Runtime()919        run.create_policy('test1')920        run.insert('p(x) :- test2:q(x)', 'test1')921        run.create_policy('test2')922        run.insert('q(1)', 'test2')923        actual = run.select('p(x)', 'test1')924        e = helper.db_equal(actual, 'p(1)')925        self.assertTrue(e, "Creation after reference")926    def test_policy_deletion_after_ref(self):927        """Test ability to write rules that span multiple policies."""928        # Local table used929        run = agnostic.Runtime()930        run.create_policy('test1')931        run.insert('p(x) :- test2:q(x)', 'test1')932        # ensuring this code runs, without causing an error933        run.create_policy('test2')934        run.delete_policy('test2')935        # add the policy back, this time checking for dangling refs936        run.create_policy('test2')937        self.assertRaises(exception.DanglingReference, run.delete_policy,938                          'test2', disallow_dangling_refs=True)939    def test_policy_deletion_dependency_graph(self):940        """Ensure dependency graph is properly updated when deleting policy."""941        run = agnostic.Runtime()942        run.create_policy('alice')943        run.insert('p(x) :- q(x)')944        LOG.info("graph: \n%s", run.global_dependency_graph)945        self.assertTrue(run.global_dependency_graph.edge_in(946            'alice:p', 'alice:q', False))947        # don't delete rules first--just delete policy948        run.delete_policy('alice')949        self.assertEqual(len(run.global_dependency_graph), 0)950class TestDependencyGraph(base.TestCase):951    def test_fact_insert(self):952        run = agnostic.Runtime()953        run.create_policy('test')954        facts = [compile.Fact('p', [1])]955        run.initialize_tables([], facts)956        self.assertFalse(run.global_dependency_graph.node_in('test:p'))957    def test_atom_insert(self):958        run = agnostic.Runtime()959        run.create_policy('test')960        run.insert('p(1)')961        self.assertFalse(run.global_dependency_graph.node_in('test:p'))962    def test_rule_noop(self):963        run = agnostic.Runtime()964        run.create_policy('test')965        run.insert('q(1) :- p(1)')966        run.delete('q(2) :- p(2)')967        self.assertTrue(run.global_dependency_graph.node_in('test:p'))968        self.assertTrue(run.global_dependency_graph.node_in('test:q'))969        self.assertTrue(run.global_dependency_graph.edge_in(970            'test:q', 'test:p', False))971    def test_atom_deletion(self):972        run = agnostic.Runtime()973        run.create_policy('test')974        run.insert('q(x) :- p(x)')975        run.delete('p(1)')976        run.delete('p(1)')977        # actually just testing that no error is thrown978        self.assertFalse(run.global_dependency_graph.has_cycle())979class TestSimulate(base.TestCase):980    DEFAULT_THEORY = 'test_default'981    ACTION_THEORY = 'test_action'982    def prep_runtime(self, code=None, msg=None, target=None, theories=None):983        if code is None:984            code = ""985        if target is None:986            target = self.DEFAULT_THEORY987        run = agnostic.Runtime()988        run.create_policy(self.DEFAULT_THEORY, abbr='default')989        run.create_policy(self.ACTION_THEORY, abbr='action', kind='action')990        if theories:991            for theory in theories:992                run.create_policy(theory)993        run.debug_mode()994        run.insert(code, target=target)995        return run996    def create(self, action_code, class_code, theories=None):997        run = self.prep_runtime(theories=theories)998        actth = self.ACTION_THEORY999        permitted, errors = run.insert(action_code, target=actth)1000        self.assertTrue(permitted, "Error in action policy: {}".format(1001            utility.iterstr(errors)))1002        defth = self.DEFAULT_THEORY1003        permitted, errors = run.insert(class_code, target=defth)1004        self.assertTrue(permitted, "Error in classifier policy: {}".format(1005            utility.iterstr(errors)))1006        return run1007    def check(self, run, action_sequence, query, correct, msg, delta=False):1008        original_db = str(run.theory[self.DEFAULT_THEORY])1009        actual = run.simulate(1010            query, self.DEFAULT_THEORY, action_sequence,1011            self.ACTION_THEORY, delta=delta)1012        e = helper.datalog_equal(actual, correct)1013        self.assertTrue(e, msg + " (Query results not correct)")1014        e = helper.db_equal(1015            str(run.theory[self.DEFAULT_THEORY]), original_db)1016        self.assertTrue(e, msg + " (Rollback failed)")1017    def test_multipolicy_state_1(self):1018        """Test update sequence affecting datasources."""1019        run = self.prep_runtime(theories=['nova', 'neutron'])1020        run.insert('p(x) :- nova:p(x)', self.DEFAULT_THEORY)1021        sequence = 'nova:p+(1) neutron:p+(2)'1022        self.check(run, sequence, 'p(x)', 'p(1)', 'Separate theories')1023    def test_multipolicy_state_2(self):1024        """Test update sequence affecting datasources."""1025        run = self.prep_runtime(theories=['nova', 'neutron'])1026        run.insert('p(x) :- neutron:p(x)', self.DEFAULT_THEORY)1027        run.insert('p(x) :- nova:p(x)', self.DEFAULT_THEORY)1028        sequence = 'nova:p+(1) neutron:p+(2)'1029        self.check(run, sequence, 'p(x)', 'p(1) p(2)', 'Separate theories 2')1030    def test_multipolicy_state_3(self):1031        """Test update sequence affecting datasources."""1032        run = self.prep_runtime(theories=['nova', 'neutron'])1033        run.insert('p(x) :- neutron:p(x)', self.DEFAULT_THEORY)1034        run.insert('p(x) :- nova:p(x)', self.DEFAULT_THEORY)1035        run.insert('p(1)', 'nova')1036        sequence = 'nova:p+(1) neutron:p+(2)'1037        self.check(run, sequence, 'p(x)', 'p(1) p(2)', 'Separate theories 3')1038        self.check(run, '', 'p(x)', 'p(1)', 'Existing data separate theories')1039    def test_multipolicy_action_sequence(self):1040        """Test sequence updates with actions that impact multiple policies."""1041        action_code = ('nova:p+(x) :- q(x)'1042                       'neutron:p+(y) :- q(x), plus(x, 1, y)'1043                       'ceilometer:p+(y) :- q(x), plus(x, 5, y)'1044                       'action("q")')1045        classify_code = 'p(x) :- nova:p(x)  p(x) :- neutron:p(x) p(3)'1046        run = self.create(action_code, classify_code,1047                          theories=['nova', 'neutron', 'ceilometer'])1048        action_sequence = 'q(1)'1049        self.check(run, action_sequence, 'p(x)', 'p(1) p(2) p(3)',1050                   'Multi-policy actions')1051    def test_action_sequence(self):1052        """Test sequence updates with actions."""1053        # Simple1054        action_code = ('p+(x) :- q(x) action("q")')1055        classify_code = 'p(2)'  # just some other data present1056        run = self.create(action_code, classify_code)1057        action_sequence = 'q(1)'1058        self.check(run, action_sequence, 'p(x)', 'p(1) p(2)', 'Simple')1059        # Noop does not break rollback1060        action_code = ('p-(x) :- q(x)'1061                       'action("q")')1062        classify_code = ('')1063        run = self.create(action_code, classify_code)1064        action_sequence = 'q(1)'1065        self.check(run, action_sequence, 'p(x)', '',1066                   "Rollback handles Noop")1067        # Add and delete1068        action_code = ('action("act") '1069                       'p+(x) :- act(x) '1070                       'p-(y) :- act(x), r(x, y) ')1071        classify_code = 'p(2) r(1, 2)'1072        run = self.create(action_code, classify_code)1073        action_sequence = 'act(1)'1074        self.check(run, action_sequence, 'p(x)', 'p(1)', 'Add and delete')1075        # insertion takes precedence over deletion1076        action_code = ('p+(x) :- q(x)'1077                       'p-(x) :- r(x)'1078                       'action("q")')1079        classify_code = ('')1080        run = self.create(action_code, classify_code)1081        # ordered so that consequences will be p+(1) p-(1)1082        action_sequence = 'q(1), r(1) :- true'1083        self.check(run, action_sequence, 'p(x)', 'p(1)',1084                   "Deletion before insertion")1085        # multiple action sequences 11086        action_code = ('p+(x) :- q(x)'1087                       'p-(x) :- r(x)'1088                       'action("q")'1089                       'action("r")')1090        classify_code = ('')1091        run = self.create(action_code, classify_code)1092        action_sequence = 'q(1) r(1)'1093        self.check(run, action_sequence, 'p(x)', '',1094                   "Multiple actions: inversion from {}")1095        # multiple action sequences 21096        action_code = ('p+(x) :- q(x)'1097                       'p-(x) :- r(x)'1098                       'action("q")'1099                       'action("r")')1100        classify_code = ('p(1)')1101        run = self.create(action_code, classify_code)1102        action_sequence = 'q(1) r(1)'1103        self.check(run, action_sequence, 'p(x)', '',1104                   "Multiple actions: inversion from p(1), first is noop")1105        # multiple action sequences 31106        action_code = ('p+(x) :- q(x)'1107                       'p-(x) :- r(x)'1108                       'action("q")'1109                       'action("r")')1110        classify_code = ('p(1)')1111        run = self.create(action_code, classify_code)1112        action_sequence = 'r(1) q(1)'1113        self.check(run, action_sequence, 'p(x)', 'p(1)',1114                   "Multiple actions: inversion from p(1), first is not noop")1115        # multiple action sequences 41116        action_code = ('p+(x) :- q(x)'1117                       'p-(x) :- r(x)'1118                       'action("q")'1119                       'action("r")')1120        classify_code = ('')1121        run = self.create(action_code, classify_code)1122        action_sequence = 'r(1) q(1)'1123        self.check(run, action_sequence, 'p(x)', 'p(1)',1124                   "Multiple actions: inversion from {}, first is not noop")1125        # Action with additional info1126        action_code = ('p+(x,z) :- q(x,y), r(y,z)'1127                       'action("q") action("r")')1128        classify_code = 'p(1,2)'1129        run = self.create(action_code, classify_code)1130        action_sequence = 'q(1,2), r(2,3) :- true'1131        self.check(run, action_sequence, 'p(x,y)', 'p(1,2) p(1,3)',1132                   'Action with additional info')1133    def test_state_rule_sequence(self):1134        """Test state and rule update sequences."""1135        # State update1136        action_code = ''1137        classify_code = 'p(1)'1138        run = self.create(action_code, classify_code)1139        action_sequence = 'p+(2)'1140        self.check(run, action_sequence, 'p(x)', 'p(1) p(2)',1141                   'State update')1142        # Rule update1143        action_code = ''1144        classify_code = 'q(1)'1145        run = self.create(action_code, classify_code)1146        action_sequence = 'p+(x) :- q(x)'1147        self.check(run, action_sequence, 'p(x)', 'p(1)',1148                   'Rule update')1149    def test_complex_sequence(self):1150        """Test more complex sequences of updates."""1151        # action with query1152        action_code = ('p+(x, y) :- q(x, y)'1153                       'action("q")')1154        classify_code = ('r(1)')1155        run = self.create(action_code, classify_code)1156        action_sequence = 'q(x, 0) :- r(x)'1157        self.check(run, action_sequence, 'p(x,y)', 'p(1,0)',1158                   'Action with query')1159        # action sequence with results1160        action_code = ('p+(id, val) :- create(val)'1161                       'p+(id, val) :- update(id, val)'1162                       'p-(id, val) :- update(id, newval), p(id, val)'1163                       'action("create")'1164                       'action("update")'1165                       'result(x) :- create(val), p+(x,val)')1166        classify_code = 'hasval(val) :- p(x, val)'1167        run = self.create(action_code, classify_code)1168        action_sequence = 'create(0)  update(x,1) :- result(x)'1169        self.check(run, action_sequence, 'hasval(x)', 'hasval(1)',1170                   'Action sequence with results')1171    def test_delta_add(self):1172        """Test when asking for changes in query."""1173        action_code = ('action("q") '1174                       'p+(x) :- q(x) ')1175        classify_code = 'p(2)'  # just some other data present1176        run = self.create(action_code, classify_code)1177        action_sequence = 'q(1)'1178        self.check(run, action_sequence, 'p(x)', 'p+(1)', 'Add',1179                   delta=True)1180    def test_delta_delete(self):1181        """Test when asking for changes in query."""1182        action_code = ('action("q") '1183                       'p-(x) :- q(x) ')1184        classify_code = 'p(1) p(2)'  # p(2): just some other data present1185        run = self.create(action_code, classify_code)1186        action_sequence = 'q(1)'1187        self.check(run, action_sequence, 'p(x)', 'p-(1)', 'Delete',1188                   delta=True)1189    def test_delta_add_delete(self):1190        """Test when asking for changes in query."""1191        action_code = ('action("act") '1192                       'p+(x) :- act(x) '1193                       'p-(y) :- act(x), r(x, y) ')1194        classify_code = 'p(2) r(1, 2) p(3)'  # p(3): just other data present1195        run = self.create(action_code, classify_code)1196        action_sequence = 'act(1)'1197        self.check(run, action_sequence, 'p(x)', 'p+(1) p-(2)',1198                   'Add and delete', delta=True)1199    def test_key_value_schema(self):1200        """Test action of key/value updates."""1201        action_code = (1202            'action("changeAttribute")'1203            'server_attributes+(uid, name, newvalue) :- '1204            'changeAttribute(uid, name, newvalue) '1205            'server_attributes-(uid, name, oldvalue) :- '1206            ' changeAttribute(uid, name, newvalue), '1207            ' server_attributes(uid, name, oldvalue)')1208        policy = 'error(uid) :- server_attributes(uid, name, 0)'1209        run = self.create(action_code, policy)1210        seq = 'changeAttribute(101, "cpu", 0)'1211        self.check(run, seq, 'error(x)', 'error(101)',1212                   'Basic error')1213        run = self.create(action_code, policy)1214        seq = 'changeAttribute(101, "cpu", 1)'1215        self.check(run, seq, 'error(x)', '',1216                   'Basic non-error')1217        data = ('server_attributes(101, "cpu", 1)')1218        run = self.create(action_code, policy + data)1219        seq = 'changeAttribute(101, "cpu", 0)'1220        self.check(run, seq, 'error(x)', 'error(101)',1221                   'Overwrite existing to cause error')1222        data = ('server_attributes(101, "cpu", 0)')1223        run = self.create(action_code, policy + data)1224        seq = 'changeAttribute(101, "cpu", 1)'1225        self.check(run, seq, 'error(x)', '',1226                   'Overwrite existing to eliminate error')1227        data = ('server_attributes(101, "cpu", 0)'1228                'server_attributes(101, "disk", 0)')1229        run = self.create(action_code, policy + data)1230        seq = 'changeAttribute(101, "cpu", 1)'1231        self.check(run, seq, 'error(x)', 'error(101)',1232                   'Overwrite existing but still error')1233    def test_duplicates(self):1234        run = agnostic.Runtime()1235        run.create_policy('test')1236        run.insert('p(x) :- q(x)')1237        run.insert('p(x) :- r(x)')1238        run.insert('q(1)')1239        run.insert('r(1)')1240        self.assertEqual(run.simulate('p(x)', 'test', '', 'test'), 'p(1)')1241class TestActionExecution(base.TestCase):1242    def setUp(self):1243        super(TestActionExecution, self).setUp()1244        self.run = agnostic.DseRuntime('test')1245        self.run.service_exists = mock.MagicMock()1246        self.run.service_exists.return_value = True1247        self.run._rpc = mock.MagicMock()1248    def test_insert_rule_insert_data(self):1249        self.run.create_policy('test')1250        self.run.debug_mode()1251        self.run.insert('execute[p(x)] :- q(x)')1252        self.assertEqual(len(self.run.logger.messages), 0,1253                         "Improper action logged")1254        self.run.insert('q(1)')1255        self.assertEqual(len(self.run.logger.messages), 1,1256                         "No action logged")1257        self.assertEqual(self.run.logger.messages[0], 'Executing test:p(1)')1258        expected_args = ('test', 'p')1259        expected_kwargs = {'args': {'positional': [1]}}1260        args, kwargs = self.run._rpc.call_args_list[0]1261        self.assertEqual(expected_args, args)1262        self.assertEqual(expected_kwargs, kwargs)1263    def test_insert_data_insert_rule(self):1264        run = self.run1265        run.create_policy('test')1266        run.debug_mode()1267        run.insert('q(1)')1268        self.assertEqual(len(run.logger.messages), 0, "Improper action logged")1269        run.insert('execute[p(x)] :- q(x)')1270        self.assertEqual(len(run.logger.messages), 1, "No action logged")1271        self.assertEqual(run.logger.messages[0], 'Executing test:p(1)')1272        expected_args = ('test', 'p')1273        expected_kwargs = {'args': {'positional': [1]}}1274        args, kwargs = run._rpc.call_args_list[0]1275        self.assertEqual(expected_args, args)1276        self.assertEqual(expected_kwargs, kwargs)1277    def test_insert_data_insert_rule_delete_data(self):1278        run = self.run1279        run.create_policy('test')1280        run.debug_mode()1281        run.insert('q(1)')1282        self.assertEqual(len(run.logger.messages), 0, "Improper action logged")1283        run.insert('execute[p(x)] :- q(x)')1284        self.assertEqual(len(run.logger.messages), 1, "No action logged")1285        self.assertEqual(run.logger.messages[0], 'Executing test:p(1)')1286        run.delete('q(1)')1287        self.assertEqual(len(run.logger.messages), 1, "Delete failure")1288        self.assertEqual(run.logger.messages[0], 'Executing test:p(1)')1289        expected_args = ('test', 'p')1290        expected_kwargs = {'args': {'positional': [1]}}1291        args, kwargs = run._rpc.call_args_list[0]1292        self.assertEqual(expected_args, args)1293        self.assertEqual(expected_kwargs, kwargs)1294    def test_insert_data_insert_rule_delete_rule(self):1295        run = self.run1296        run.create_policy('test')1297        run.debug_mode()1298        run.insert('q(1)')1299        self.assertEqual(len(run.logger.messages), 0, "Improper action logged")1300        run.insert('execute[p(x)] :- q(x)')1301        self.assertEqual(len(run.logger.messages), 1, "No action logged")1302        self.assertEqual(run.logger.messages[0], 'Executing test:p(1)')1303        run.delete('execute[p(x)] :- q(x)')1304        self.assertEqual(len(run.logger.messages), 1, "Delete failure")1305        self.assertEqual(run.logger.messages[0], 'Executing test:p(1)')1306        expected_args = ('test', 'p')1307        expected_kwargs = {'args': {'positional': [1]}}1308        args, kwargs = run._rpc.call_args_list[0]1309        self.assertEqual(expected_args, args)1310        self.assertEqual(expected_kwargs, kwargs)1311    def test_insert_data_insert_rule_noop_insert(self):1312        run = self.run1313        run.create_policy('test')1314        run.debug_mode()1315        run.insert('q(1)')1316        self.assertEqual(len(run.logger.messages), 0, "Improper action logged")1317        run.insert('execute[p(x)] :- q(x)')1318        self.assertEqual(len(run.logger.messages), 1, "No action logged")1319        self.assertEqual(run.logger.messages[0], 'Executing test:p(1)')1320        run.insert('q(1)')1321        self.assertEqual(len(run.logger.messages), 1, "Improper action logged")1322        self.assertEqual(run.logger.messages[0], 'Executing test:p(1)')1323        expected_args = ('test', 'p')1324        expected_kwargs = {'args': {'positional': [1]}}1325        args, kwargs = run._rpc.call_args_list[0]1326        self.assertEqual(expected_args, args)1327        self.assertEqual(expected_kwargs, kwargs)1328    def test_disjunction(self):1329        run = self.run1330        run.create_policy('test')1331        run.debug_mode()1332        run.insert('execute[p(x)] :- q(x)')1333        run.insert('execute[p(x)] :- r(x)')1334        self.assertEqual(len(run.logger.messages), 0, "Improper action logged")1335        run.insert('q(1)')1336        self.assertEqual(len(run.logger.messages), 1, "No action logged")1337        self.assertEqual(run.logger.messages[0], 'Executing test:p(1)')1338        run.insert('r(1)')1339        self.assertEqual(len(run.logger.messages), 1, "Improper action logged")1340        self.assertEqual(run.logger.messages[0], 'Executing test:p(1)')1341        expected_args = ('test', 'p')1342        expected_kwargs = {'args': {'positional': [1]}}1343        args, kwargs = run._rpc.call_args_list[0]1344        self.assertEqual(expected_args, args)1345        self.assertEqual(expected_kwargs, kwargs)1346    def test_multiple_instances(self):1347        run = self.run1348        run.create_policy('test')1349        run.debug_mode()1350        run.insert('q(1)')1351        run.insert('q(2)')1352        self.assertEqual(len(run.logger.messages), 0, "Improper action logged")1353        run.insert('execute[p(x)] :- q(x)')1354        self.assertEqual(len(run.logger.messages), 2, "No action logged")1355        actualset = set([u'Executing test:p(1)', u'Executing test:p(2)'])1356        self.assertEqual(actualset, set(run.logger.messages))1357        expected_args_list = [1358            [('test', 'p'), {'args': {'positional': [1]}}],1359            [('test', 'p'), {'args': {'positional': [2]}}],1360        ]1361        for args, kwargs in run._rpc.call_args_list:1362            self.assertIn([args, kwargs], expected_args_list)1363            expected_args_list.remove([args, kwargs])1364    def test_disabled_execute_action(self):1365        cfg.CONF.set_override('enable_execute_action', False)1366        run = agnostic.DseRuntime('test')1367        run._rpc = mock.MagicMock()1368        run.service_exists = mock.MagicMock()1369        service_name = 'test-service'1370        action = 'non_executable_action'1371        action_args = {'positional': ['p_arg1'],1372                       'named': {'key1': 'value1'}}1373        run.execute_action(service_name, action, action_args)1374        self.assertFalse(run._rpc.called)1375class TestDisabledRules(base.SqlTestCase):1376    """Tests for Runtime's ability to enable/disable rules."""1377    # insertions1378    def test_insert_enabled(self):1379        run = agnostic.Runtime()1380        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1381        schema = compile.Schema({'q': ('id', 'name', 'status')})1382        run.set_schema('test', schema)1383        obj = run.policy_object('test')1384        run.insert('p(x) :- q(id=x)')1385        self.assertEqual(len(run.error_events), 0)1386        self.assertEqual(len(run.disabled_events), 0)1387        self.assertEqual(len(obj.content()), 1)1388    def test_insert_disabled(self):1389        run = agnostic.Runtime()1390        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1391        obj = run.policy_object('test')1392        run.insert('p(x) :- q(id=x)')1393        self.assertEqual(len(run.disabled_events), 1)1394        self.assertEqual(len(obj.content()), 0)1395    def test_persistent_insert_disabled(self):1396        """Test that persistent_insert_rule errors on IncompleteSchemaException1397        When a table schema is not available, named column references are1398        permitted but disabled in non-persistent rule insert to allow for1399        late-arriving schema when importing rules already in DB.1400        This behavior is not necessary in persistent_insert.1401        """1402        run = agnostic.DseRuntime('dse')1403        run.synchronizer = synchronizer.PolicyRuleSynchronizer(1404            run, run.node)1405        run.create_policy('data', kind=datalog_base.DATASOURCE_POLICY_TYPE)1406        run.persistent_create_policy('policy')1407        obj = run.policy_object('policy')1408        run.insert('p(x) :- data:q(id=x)')1409        try:1410            run.persistent_insert_rule('policy', 'p(x) :- data:q(id=x)',1411                                       '', '')1412        except exception.PolicyException as e:1413            self.assertTrue(1414                'Literal data:q(id=x) uses unknown table q'1415                in str(e),1416                'Named column reference on unknown table '1417                'should be disallowed in persistent insert')1418        self.assertEqual(len(run.disabled_events), 0)1419        self.assertEqual(len(obj.content()), 0)1420        try:1421            run.persistent_insert_rule('policy', 'p(x) :- unknown:q(id=x)',1422                                       '', '')1423        except exception.PolicyException as e:1424            self.assertTrue(1425                'Literal unknown:q(id=x) uses named arguments, but the '1426                'schema is unknown.'1427                in str(e),1428                'Named column reference on unknown table '1429                'should be disallowed in persistent insert')1430        self.assertEqual(len(run.disabled_events), 0)1431        self.assertEqual(len(obj.content()), 0)1432    def test_insert_errors(self):1433        run = agnostic.Runtime()1434        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1435        schema = compile.Schema({'q': ('name', 'status')})1436        run.set_schema('test', schema)1437        obj = run.policy_object('test')1438        permitted, errors = run.insert('p(x) :- q(id=x)')1439        self.assertFalse(permitted)1440        errstring = " ".join(str(x) for x in errors)1441        self.assertIn("column name id does not exist", errstring)1442        self.assertEqual(len(run.error_events), 0)1443        self.assertEqual(len(run.disabled_events), 0)1444        self.assertEqual(len(obj.content()), 0)1445    def test_insert_set_schema_disabled(self):1446        run = agnostic.Runtime()1447        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1448        obj = run.policy_object('test')1449        run.insert('p(x) :- q(id=x)')   # rule is disabled1450        self.assertEqual(len(run.disabled_events), 1)1451        schema = compile.Schema({'q': ('id', 'name', 'status')})1452        run.set_schema('test', schema)1453        self.assertEqual(len(run.error_events), 0)1454        self.assertEqual(len(run.disabled_events), 0)1455        self.assertEqual(len(obj.content()), 1)1456    def test_insert_set_schema_disabled_multiple(self):1457        # insert rule that gets disabled1458        run = agnostic.Runtime()1459        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1460        run.create_policy('nova', kind=datalog_base.DATASOURCE_POLICY_TYPE)1461        obj = run.policy_object('test')1462        run.insert('p(x) :- q(id=x), nova:r(id=x)', 'test')1463        self.assertEqual(len(run.disabled_events), 1)1464        # set first schema1465        schema = compile.Schema({'q': ('id', 'name', 'status')})1466        run.set_schema('test', schema)1467        self.assertEqual(len(run.error_events), 0)1468        self.assertEqual(len(run.disabled_events), 1)1469        self.assertEqual(len(obj.content()), 0)1470        # set second schema1471        schema = compile.Schema({'r': ('id', 'name', 'status')})1472        run.set_schema('nova', schema)1473        self.assertEqual(len(run.error_events), 0)1474        self.assertEqual(len(run.disabled_events), 0)1475        self.assertEqual(len(obj.content()), 1)1476    def test_insert_set_schema_errors(self):1477        run = agnostic.Runtime()1478        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1479        obj = run.policy_object('test')1480        run.insert('p(x) :- q(id=x)')   # rule is disabled1481        self.assertEqual(len(run.disabled_events), 1)1482        schema = compile.Schema({'q': ('name', 'status')},)1483        run.set_schema('test', schema)1484        self.assertEqual(len(run.error_events), 1)1485        self.assertEqual(len(run.disabled_events), 0)1486        self.assertEqual(len(obj.content()), 0)1487    def test_insert_inferred_schema_errors(self):1488        run = agnostic.Runtime()1489        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1490        run.insert('p(x) :- q(x)')1491        permitted, errs = run.insert('q(1,2)')1492        self.assertFalse(permitted)1493    # deletions1494    def test_delete_enabled(self):1495        run = agnostic.Runtime()1496        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1497        schema = compile.Schema({'q': ('id', 'name', 'status')})1498        run.set_schema('test', schema)1499        obj = run.policy_object('test')1500        run.insert('p(x) :- q(id=x)')1501        self.assertEqual(len(obj.content()), 1)1502        run.delete('p(x) :- q(id=x)')1503        self.assertEqual(len(run.error_events), 0)1504        self.assertEqual(len(run.disabled_events), 0)1505        self.assertEqual(len(obj.content()), 0)1506    def test_delete_set_schema_disabled(self):1507        run = agnostic.Runtime()1508        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1509        obj = run.policy_object('test')1510        run.insert('p(x) :- q(id=x)')1511        run.delete('p(x) :- q(id=x)')1512        self.assertEqual(len(run.disabled_events), 2)1513        self.assertEqual(len(obj.content()), 0)1514        schema = compile.Schema({'q': ('id', 'name', 'status')})1515        run.set_schema('test', schema)1516        self.assertEqual(len(run.disabled_events), 0)1517        self.assertEqual(len(obj.content()), 0)1518    def test_delete_errors(self):1519        run = agnostic.Runtime()1520        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1521        schema = compile.Schema({'q': ('name', 'status')})1522        run.set_schema('test', schema)1523        obj = run.policy_object('test')1524        permitted, errors = run.delete('p(x) :- q(id=x)')1525        self.assertFalse(permitted)1526        errstring = " ".join(str(x) for x in errors)1527        self.assertIn("column name id does not exist", errstring)1528        self.assertEqual(len(run.error_events), 0)1529        self.assertEqual(len(run.disabled_events), 0)1530        self.assertEqual(len(obj.content()), 0)1531    def test_delete_set_schema_errors(self):1532        run = agnostic.Runtime()1533        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1534        obj = run.policy_object('test')1535        run.delete('p(x) :- q(id=x)')   # rule is disabled1536        self.assertEqual(len(run.disabled_events), 1)1537        schema = compile.Schema({'q': ('name', 'status')})1538        run.set_schema('test', schema)1539        self.assertEqual(len(run.error_events), 1)1540        self.assertEqual(len(run.disabled_events), 0)1541        self.assertEqual(len(obj.content()), 0)1542    # errors in set_schema1543    def test_set_schema_unknown_policy(self):1544        run = agnostic.Runtime()1545        schema = compile.Schema({'q': ('name', 'status')})1546        try:1547            run.set_schema('test', schema)1548            self.fail("Error not thrown on unknown policy")1549        except exception.CongressException as e:1550            self.assertIn("not been created", str(e))1551    def test_disallow_schema_change(self):1552        # Ensures that cannot change schema once it is set.1553        # Can be removed once we support schema changes (e.g. for upgrade).1554        run = agnostic.Runtime()1555        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1556        schema = compile.Schema({'q': ('name', 'status')})1557        run.set_schema('test', schema)1558        schema = compile.Schema({'q': ('id', 'name', 'status')})1559        try:1560            run.set_schema('test', schema)1561            self.fail("Error not thrown on schema change")1562        except exception.CongressException as e:1563            self.assertIn("Schema for test already set", str(e))1564    def test_insert_without_datasource_policy(self):1565        run = agnostic.Runtime()1566        run.create_policy('beta')   # not datasource policy1567        # exception because col refs over non-datasource policy1568        permitted, errors = run.insert('p(x) :- beta:q(name=x)')1569        self.assertFalse(permitted)1570        self.assertTrue(1571            any("does not reference a datasource policy" in str(e)1572                for e in errors))1573    def test_delete_policy_while_disabled_events_outstanding(self):1574        """Test deleting policy while there are disabled_events outstanding."""1575        run = agnostic.Runtime()1576        # generate disabled event1577        run.create_policy('test', kind=datalog_base.DATASOURCE_POLICY_TYPE)1578        obj = run.policy_object('test')1579        run.insert('p(x) :- q(id=x)')1580        self.assertEqual(len(run.disabled_events), 1)1581        self.assertEqual(len(obj.content()), 0)1582        # create and delete another policy1583        run.create_policy('to_delete')1584        run.delete_policy('to_delete')1585class TestDelegation(base.TestCase):1586    """Tests for Runtime's delegation functionality."""1587    def test_subpolicy(self):1588        run = agnostic.Runtime()1589        run.create_policy('test')1590        policy = 'error(x) :- q(x), r(x)'1591        run.insert(policy)1592        subpolicy = run.find_subpolicy(1593            set(['q']), set(), set(['error', 'warning']))1594        e = helper.datalog_equal(subpolicy, policy)1595        self.assertTrue(e)1596    def test_subpolicy_multiple(self):1597        run = agnostic.Runtime()1598        run.create_policy('test')1599        policy = ('error(x) :- q(x), r(x) '1600                  'error(x) :- q(x), s(x) '1601                  'warning(x) :- t(x), q(x)')1602        run.insert(policy)1603        subpolicy = run.find_subpolicy(1604            set(['q']), set(), set(['error', 'warning']))1605        e = helper.datalog_equal(subpolicy, policy)1606        self.assertTrue(e)1607    def test_subpolicy_prohibited(self):1608        run = agnostic.Runtime()1609        run.create_policy('test')1610        policy1 = 'error(x) :- q(x), r(x) '1611        policy2 = 'error(x) :- q(x), s(x) '1612        policy3 = 'error(x) :- q(x), prohibit(x, y) '1613        policy4 = 'warning(x) :- t(x), q(x)'1614        run.insert(policy1 + policy2 + policy3 + policy4)1615        subpolicy = run.find_subpolicy(1616            set(['q']), set(['prohibit']), set(['error', 'warning']))1617        e = helper.datalog_equal(subpolicy, policy1 + policy2 + policy4)1618        self.assertTrue(e)1619    def test_subpolicy_layers(self):1620        run = agnostic.Runtime()1621        run.create_policy('test')1622        policy1 = 'error(x) :- t(x), u(x) '1623        policy2 = '    t(x) :- q(x), s(x) '1624        policy3 = 'error(x) :- p(x) '1625        policy4 = '    p(x) :- prohibit(x, y)'1626        policy5 = 'warning(x) :- t(x), q(x)'1627        run.insert(policy1 + policy2 + policy3 + policy4 + policy5)1628        subpolicy = run.find_subpolicy(1629            set(['q']), set(['prohibit']), set(['error', 'warning']))1630        e = helper.datalog_equal(subpolicy, policy1 + policy2 + policy5)...test_GetChildDomain.py
Source:test_GetChildDomain.py  
...13    @allure.testcase("ID5392,ç¨ä¾åï¼è·åå·²ç»å®åææ¡£å--éåålimitåæ°æ£åæ£æ¥--è¿å200")14    @allure.testcase("ID5396,ç¨ä¾åï¼è·åå·²ç»å®åææ¡£å--ååstartåæ°æ£åæ£æ¥--è¿å200")15    @allure.testcase("ID5398,ç¨ä¾åï¼è·åå·²ç»å®åææ¡£å--åålimitåæ°æ£åæ£æ¥--è¿å200")16    @pytest.fixture(scope="function")17    def create_policy(self, metadata_host):18        father_host = metadata_host["replace.eisoo.com"]19        child_host1 = metadata_host["child.eisoo.com"]20        child_host2 = metadata_host["self.eisoo.com"]21        father_host = (father_host.split(":")[1]).strip("/")22        child_host1 = (child_host1.split(":")[1]).strip("/")23        child_host2 = (child_host2.split(":")[1]).strip("/")24        # è·åçç¥id25        strategyId = CommonDocPolicyMgnt().AddPolicy('{"content":[{"name":"password_strength_meter",'26                                                     '"value":{"enable":True,"length":22}}],"name":"policy1"}')27        yield strategyId, father_host, child_host1, child_host228        # å é¤çç¥é
ç½®29        dbclose = DB_connect()30        dbclose.delete('delete from domain_mgnt.t_policy_tpls')31        dbclose.delete('delete from domain_mgnt.t_policy_tpl_domains')...22839_test_policy_base.py
Source:22839_test_policy_base.py  
...16from google.auth import credentials17from google.cloud.pubsub_v1 import subscriber18from google.cloud.pubsub_v1 import types19from google.cloud.pubsub_v1.subscriber.policy import thread20def create_policy(flow_control=types.FlowControl()):21    creds = mock.Mock(spec=credentials.Credentials)22    client = subscriber.Client(credentials=creds)23    return thread.Policy(client, 'sub_name_d', flow_control=flow_control)24def test_ack_deadline():25    policy = create_policy()26    assert policy.ack_deadline == 1027    policy.histogram.add(20)28    assert policy.ack_deadline == 2029    policy.histogram.add(10)30    assert policy.ack_deadline == 2031def test_get_initial_request():32    policy = create_policy()33    initial_request = policy.get_initial_request()34    assert isinstance(initial_request, types.StreamingPullRequest)35    assert initial_request.subscription == 'sub_name_d'36    assert initial_request.stream_ack_deadline_seconds == 1037def test_managed_ack_ids():38    policy = create_policy()39    # Ensure we always get a set back, even if the property is not yet set.40    managed_ack_ids = policy.managed_ack_ids41    assert isinstance(managed_ack_ids, set)42    # Ensure that multiple calls give the same actual object back.43    assert managed_ack_ids is policy.managed_ack_ids44def test_subscription():45    policy = create_policy()46    assert policy.subscription == 'sub_name_d'47def test_ack():48    policy = create_policy()49    policy._consumer.active = True50    with mock.patch.object(policy._consumer, 'send_request') as send_request:51        policy.ack('ack_id_string', 20)52        send_request.assert_called_once_with(types.StreamingPullRequest(53            ack_ids=['ack_id_string'],54        ))55    assert len(policy.histogram) == 156    assert 20 in policy.histogram57def test_ack_no_time():58    policy = create_policy()59    policy._consumer.active = True60    with mock.patch.object(policy._consumer, 'send_request') as send_request:61        policy.ack('ack_id_string')62        send_request.assert_called_once_with(types.StreamingPullRequest(63            ack_ids=['ack_id_string'],64        ))65    assert len(policy.histogram) == 066def test_ack_paused():67    policy = create_policy()68    policy._paused = True69    policy._consumer.active = False70    with mock.patch.object(policy, 'open') as open_:71        policy.ack('ack_id_string')72        open_.assert_called()73    assert 'ack_id_string' in policy._ack_on_resume74def test_call_rpc():75    policy = create_policy()76    with mock.patch.object(policy._client.api, 'streaming_pull') as pull:77        policy.call_rpc(mock.sentinel.GENERATOR)78        pull.assert_called_once_with(mock.sentinel.GENERATOR)79def test_drop():80    policy = create_policy()81    policy.managed_ack_ids.add('ack_id_string')82    policy._bytes = 2083    policy.drop('ack_id_string', 20)84    assert len(policy.managed_ack_ids) == 085    assert policy._bytes == 086    # Do this again to establish idempotency.87    policy.drop('ack_id_string', 20)88    assert len(policy.managed_ack_ids) == 089    assert policy._bytes == 090def test_drop_below_threshold():91    """Establish that we resume a paused subscription.92    If the subscription is paused, and we drop sufficiently below93    the flow control thresholds, it should resume.94    """95    policy = create_policy()96    policy.managed_ack_ids.add('ack_id_string')97    policy._bytes = 2098    policy._paused = True99    with mock.patch.object(policy, 'open') as open_:100        policy.drop(ack_id='ack_id_string', byte_size=20)101        open_.assert_called_once_with(policy._callback)102    assert policy._paused is False103def test_load():104    flow_control = types.FlowControl(max_messages=10, max_bytes=1000)105    policy = create_policy(flow_control=flow_control)106    # This should mean that our messages count is at 10%, and our bytes107    # are at 15%; the ._load property should return the higher (0.15).108    policy.lease(ack_id='one', byte_size=150)109    assert policy._load == 0.15110    # After this message is added, the messages should be higher at 20%111    # (versus 16% for bytes).112    policy.lease(ack_id='two', byte_size=10)113    assert policy._load == 0.2114    # Returning a number above 100% is fine.115    policy.lease(ack_id='three', byte_size=1000)116    assert policy._load == 1.16117def test_modify_ack_deadline():118    policy = create_policy()119    with mock.patch.object(policy._consumer, 'send_request') as send_request:120        policy.modify_ack_deadline('ack_id_string', 60)121        send_request.assert_called_once_with(types.StreamingPullRequest(122            modify_deadline_ack_ids=['ack_id_string'],123            modify_deadline_seconds=[60],124        ))125def test_maintain_leases_inactive_consumer():126    policy = create_policy()127    policy._consumer.active = False128    assert policy.maintain_leases() is None129def test_maintain_leases_ack_ids():130    policy = create_policy()131    policy._consumer.active = True132    policy.lease('my ack id', 50)133    # Mock the sleep object.134    with mock.patch.object(time, 'sleep', autospec=True) as sleep:135        def trigger_inactive(seconds):136            assert 0 < seconds < 10137            policy._consumer.active = False138        sleep.side_effect = trigger_inactive139        # Also mock the consumer, which sends the request.140        with mock.patch.object(policy._consumer, 'send_request') as send:141            policy.maintain_leases()142            send.assert_called_once_with(types.StreamingPullRequest(143                modify_deadline_ack_ids=['my ack id'],144                modify_deadline_seconds=[10],145            ))146        sleep.assert_called()147def test_maintain_leases_no_ack_ids():148    policy = create_policy()149    policy._consumer.active = True150    with mock.patch.object(time, 'sleep', autospec=True) as sleep:151        def trigger_inactive(seconds):152            assert 0 < seconds < 10153            policy._consumer.active = False154        sleep.side_effect = trigger_inactive155        policy.maintain_leases()156        sleep.assert_called()157def test_lease():158    policy = create_policy()159    policy.lease(ack_id='ack_id_string', byte_size=20)160    assert len(policy.managed_ack_ids) == 1161    assert policy._bytes == 20162    # Do this again to prove idempotency.163    policy.lease(ack_id='ack_id_string', byte_size=20)164    assert len(policy.managed_ack_ids) == 1165    assert policy._bytes == 20166def test_lease_above_threshold():167    flow_control = types.FlowControl(max_messages=2)168    policy = create_policy(flow_control=flow_control)169    with mock.patch.object(policy, 'close') as close:170        policy.lease(ack_id='first_ack_id', byte_size=20)171        assert close.call_count == 0172        policy.lease(ack_id='second_ack_id', byte_size=25)173        close.assert_called_once_with()174def test_nack():175    policy = create_policy()176    with mock.patch.object(policy, 'modify_ack_deadline') as mad:177        with mock.patch.object(policy, 'drop') as drop:178            policy.nack(ack_id='ack_id_string', byte_size=10)179            drop.assert_called_once_with(ack_id='ack_id_string', byte_size=10)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
