How to use get_plugins method in localstack

Best Python code snippet using localstack_python

test_manager.py

Source:test_manager.py Github

copy

Full Screen

...44 def test_finds_no_plugins_in_empty_directory(self):45 plugin_dir, cleanup = setup_plugins()46 self.addCleanup(cleanup)47 manager = PluginManager(self._app, [plugin_dir])48 self.assertEqual(len(manager.get_plugins()), 0)49 def test_finds_one_plugin_file(self):50 plugin_dir, cleanup = setup_plugins('alpha_plugin.py')51 self.addCleanup(cleanup)52 manager = PluginManager(self._app, [plugin_dir])53 self.assertEqual(list(manager.plugin_files),54 [os.path.join(plugin_dir, 'alpha_plugin.py')])55 def test_finds_one_plugin(self):56 plugin_dir, cleanup = setup_plugins('alpha_plugin.py')57 self.addCleanup(cleanup)58 manager = PluginManager(self._app, [plugin_dir])59 plugins = list(manager.get_plugins())60 self.assertEqual(len(plugins), 1)61 self.assertTrue(isinstance(plugins[0], Plugin))62 def test_plugin_loading_sets_application(self):63 plugin_dir, cleanup = setup_plugins('alpha_plugin.py')64 self.addCleanup(cleanup)65 manager = PluginManager(self._app, [plugin_dir])66 plugins = list(manager.get_plugins())67 self.assertEqual(plugins[0].app, self._app)68 def test_plugin_loading_callback(self):69 callback_calls = []70 def callback(filename, i, total):71 callback_calls.append((os.path.basename(filename), i, total))72 plugin_dir, cleanup = setup_plugins('alpha_plugin.py')73 manager = PluginManager(self._app, [plugin_dir])74 manager.get_plugins(callback=callback)75 self.assertEqual(callback_calls, [('alpha_plugin.py', 0, 1)])76 def test_plugin_loading_callback_with_multiple_plugins(self):77 callback_calls = []78 def callback(filename, i, total):79 callback_calls.append((os.path.basename(filename), i, total))80 plugin_dir, cleanup = setup_plugins(81 'alpha_plugin.py', 'bravo_plugin.py')82 manager = PluginManager(self._app, [plugin_dir])83 manager.get_plugins(callback=callback)84 self.assertEqual(callback_calls, [85 ('alpha_plugin.py', 0, 2),86 ('bravo_plugin.py', 1, 2),87 ])88 def test_condition_equality(self):89 # The first part of the conditions test looks for exactly equality90 # between the condition argument and the plugin's condition91 # attribute.92 plugin_dir, cleanup = setup_plugins(93 'alpha_plugin.py', 'bravo_plugin.py')94 manager = PluginManager(self._app, [plugin_dir])95 # Start by getting all the plugins.96 all_plugins = manager.get_plugins()97 # Set some conditions on the plugins.98 all_plugins[0].condition = 'alpha'99 all_plugins[1].condition = 'bravo'100 self.assertEqual(manager.get_plugins(condition='zero'), [])101 self.assertEqual(manager.get_plugins(condition='alpha'),102 [all_plugins[0]])103 self.assertEqual(manager.get_plugins(condition='bravo'),104 [all_plugins[1]])105 def test_condition_in(self):106 # The second part of the conditions test checks for the given107 # condition being in the sequence of conditions in the plugin. This108 # is kind of crappy because let's say a plugin's condition is109 # 'happy_days' and you pass in condition='happy', you'll get a match.110 # Oh well, it's been this way forever.111 plugin_dir, cleanup = setup_plugins(112 'alpha_plugin.py', 'bravo_plugin.py')113 manager = PluginManager(self._app, [plugin_dir])114 # Start by getting all the plugins.115 all_plugins = manager.get_plugins()116 # Set some conditions on the plugins.117 all_plugins[0].condition = ['alpha', 'happy']118 all_plugins[1].condition = ['bravo', 'happy', 'sad']119 self.assertEqual(manager.get_plugins(condition='zero'), [])120 self.assertEqual(manager.get_plugins(condition='alpha'),121 [all_plugins[0]])122 self.assertEqual(manager.get_plugins(condition='bravo'),123 [all_plugins[1]])124 self.assertEqual(manager.get_plugins(condition='happy'), all_plugins)125 self.assertEqual(manager.get_plugins(condition='sad'),126 [all_plugins[1]])127 def test_condition_wildcard(self):128 # The third conditions test matches everything.129 plugin_dir, cleanup = setup_plugins(130 'alpha_plugin.py', 'bravo_plugin.py', 'charlie_plugin.py')131 manager = PluginManager(self._app, [plugin_dir])132 # Start by getting all the plugins.133 all_plugins = manager.get_plugins()134 self.assertEqual(len(all_plugins), 3)135 # Set some conditions on the plugins.136 all_plugins[0].condition = ['alpha', 'happy']137 all_plugins[1].condition = ['bravo', 'happy', 'sad']138 # Do not give the third plugin an explicit condition.139 self.assertEqual(manager.get_plugins(condition='*'), all_plugins)140 def test_condition_default_matches_conditionless(self):141 # By default, only conditionless plugins match the manager default.142 plugin_dir, cleanup = setup_plugins(143 'alpha_plugin.py', 'bravo_plugin.py', 'charlie_plugin.py')144 manager = PluginManager(self._app, [plugin_dir])145 # Start by getting all the plugins.146 all_plugins = manager.get_plugins()147 self.assertEqual(len(all_plugins), 3)148 # Set some conditions on the plugins.149 all_plugins[0].condition = ['alpha', 'happy']150 all_plugins[1].condition = ['bravo', 'happy', 'sad']151 # Do not give the third plugin an explicit condition....

Full Screen

Full Screen

willy.py

Source:willy.py Github

copy

Full Screen

...22 self.loop.run_until_complete(self.start(*args))23 async def on_ready(self):24 for plugin in self.plugins:25 self.loop.create_task(plugin.on_ready())26 async def get_plugins(self, server):27 plugins = await self.plugin_manager.get_all(server)28 return plugins29 async def send_message(self, *args, **kwargs):30 return await super().send_message(*args, **kwargs)31 async def on_message(self, message):32 if message.channel.is_private:33 return34 server = message.server35 enabled_plugins = await self.get_plugins(server)36 for plugin in enabled_plugins:37 self.loop.create_task(plugin._on_message(message))38 async def on_message_edit(self, before, after):39 if before.channel.is_private:40 return41 server = after.server42 enabled_plugins = await self.get_plugins(server)43 for plugin in enabled_plugins:44 self.loop.create_task(plugin.on_message_edit(before, after))45 async def on_message_delete(self, message):46 if message.channel.is_private:47 return48 server = message.server49 enabled_plugins = await self.get_plugins(server)50 for plugin in enabled_plugins:51 self.loop.create_task(plugin.on_message_delete(message))52 async def on_channel_create(self, channel):53 if channel.is_private:54 return55 server = channel.server56 enabled_plugins = await self.get_plugins(server)57 for plugin in enabled_plugins:58 self.loop.create_task(plugin.on_channel_create(channel))59 async def on_channel_update(self, before, after):60 if before.is_private:61 return62 server = after.server63 enabled_plugins = await self.get_plugins(server)64 for plugin in enabled_plugins:65 self.loop.create_task(plugin.on_channel_update(before, after))66 async def on_channel_delete(self, channel):67 if channel.is_private:68 return69 server = channel.server70 enabled_plugins = await self.get_plugins(server)71 for plugin in enabled_plugins:72 self.loop.create_task(plugin.on_channel_delete(channel))73 async def on_member_join(self, member):74 server = member.server75 enabled_plugins = await self.get_plugins(server)76 for plugin in enabled_plugins:77 self.loop.create_task(plugin.on_member_join(member))78 async def on_member_remove(self, member):79 server = member.server80 enabled_plugins = await self.get_plugins(server)81 for plugin in enabled_plugins:82 self.loop.create_task(plugin.on_member_remove(member))83 async def on_member_update(self, before, after):84 server = after.server85 enabled_plugins = await self.get_plugins(server)86 for plugin in enabled_plugins:87 self.loop.create_task(plugin.on_member_update(before, after))88 async def on_server_update(self, before, after):89 server = after90 enabled_plugins = await self.get_plugins(server)91 for plugin in enabled_plugins:92 self.loop.create_task(plugin.on_server_update(before, after))93 async def on_server_role_create(self, server, role):94 enabled_plugins = await self.get_plugins(server)95 for plugin in enabled_plugins:96 self.loop.create_task(plugin.on_server_role_create(server, role))97 async def on_server_role_delete(self, server, role):98 enabled_plugins = await self.get_plugins(server)99 for plugin in enabled_plugins:100 self.loop.create_task(plugin.on_server_role_delete(server, role))101 async def on_server_role_update(self, before, after):102 server = None103 for s in self.servers:104 if after.id in map(lambda r: r.id, s.roles):105 server = s106 break107 if server is None:108 return109 enabled_plugins = await self.get_plugins(server)110 for plugin in enabled_plugins:111 self.loop.create_task(plugin.on_server_role_update(before, after))112 async def on_voice_state_update(self, before, after):113 if after is None and before is None:114 return115 elif after is None:116 server = before.server117 elif before is None:118 server = after.server119 else:120 server = after.server121 enabled_plugins = await self.get_plugins(server)122 for plugin in enabled_plugins:123 self.loop.create_task(plugin.on_voice_state_update(before, after))124 async def on_member_ban(self, member):125 server = member.server126 enabled_plugins = await self.get_plugins(server)127 for plugin in enabled_plugins:128 self.loop.create_task(plugin.on_member_ban(member))129 async def on_member_unban(self, member):130 server = member.server131 enabled_plugins = await self.get_plugins(server)132 for plugin in enabled_plugins:133 self.loop.create_task(plugin.on_member_unban(member))134 async def on_typing(self, channel, user, when):135 if channel.is_private:136 return137 server = channel.server138 enabled_plugins = await self.get_plugins(server)139 for plugin in enabled_plugins:...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

...27 'buildslave', # deprecated, use 'worker' instead.28]29# Names here match the names of the corresponding Buildbot module, hence30# 'changes', 'schedulers', but 'buildslave'31changes = get_plugins('changes', IChangeSource)32schedulers = get_plugins('schedulers', IScheduler)33steps = get_plugins('steps', IBuildStep)34util = get_plugins('util', None)35reporters = get_plugins('reporters', None)36secrets = get_plugins('secrets', None)37webhooks = get_plugins('webhooks', None)38# For plugins that are not updated to the new worker names, plus fallback of39# current Buildbot plugins for old configuration files.40buildslave = get_plugins('buildslave', IWorker)41# Worker entry point for new/updated plugins....

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful