How to use _handle_event method in Lemoncheesecake

Best Python code snippet using lemoncheesecake

test_bot.py

Source:test_bot.py Github

copy

Full Screen

...127 self.object.user_manager.set.return_value = test_user128 self.object.get_channel = AsyncMock()129 self.object.api_client.users_info = AsyncMock()130 self.object.api_client.users_info.coro.return_value = test_user_response131 await self.object._handle_event('message', test_payload)132 self.object.user_manager.get.assert_called_with(test_user_id)133 self.object.user_manager.set.assert_called()134 # test _prepare_and_send_output without any command options set (reply in thread, etc.)135 def test_prepare_and_send_output_no_cmd_options(self):136 self.object.send_message = AsyncMock()137 self.object.send_im = mock.Mock()138 self.object.web_client = AsyncMock()139 # async def _prepare_and_send_output(self, cmd, msg, cmd_options, output):140 self.object._prepare_and_send_output(test_command, self.test_event, {}, test_output)141 self.object.send_message.assert_called_with(self.test_event.channel, test_output, thread=test_thread_ts,142 reply_broadcast=None, parse=None)143 # test _prepare_and_send_output with various options144 def test_prepare_and_send_output_with_cmd_options(self):145 self.object.send_message = mock.Mock()...

Full Screen

Full Screen

event_services.py

Source:event_services.py Github

copy

Full Screen

...35 taskqueue_services.defer_to_events_queue(36 jobs_registry.ContinuousComputationEventDispatcher.dispatch_event,37 cls.EVENT_TYPE, *args, **kwargs)38 @classmethod39 def _handle_event(cls, *args, **kwargs):40 """Perform in-request processing of an incoming event."""41 raise NotImplementedError(42 'Subclasses of BaseEventHandler should implement the '43 '_handle_event() method, using explicit arguments '44 '(no *args or **kwargs).')45 @classmethod46 def record(cls, *args, **kwargs):47 """Process incoming events.48 Callers of event handlers should call this method, not _handle_event().49 """50 cls._notify_continuous_computation_listeners_async(*args, **kwargs)51 cls._handle_event(*args, **kwargs)52class AnswerSubmissionEventHandler(BaseEventHandler):53 """Event handler for recording answer submissions."""54 EVENT_TYPE = feconf.EVENT_TYPE_ANSWER_SUBMITTED55 @classmethod56 def _notify_continuous_computation_listeners_async(cls, *args, **kwargs):57 # Disable this method until we can deal with large answers, otherwise58 # the data that is being placed on the task queue is too large.59 pass60 @classmethod61 def _handle_event(cls, exploration_id, exploration_version, state_name,62 handler_name, rule, answer):63 """Records an event when an answer triggers a rule."""64 # TODO(sll): Escape these args?65 stats_models.process_submitted_answer(66 exploration_id, exploration_version, state_name,67 handler_name, rule, answer)68class DefaultRuleAnswerResolutionEventHandler(BaseEventHandler):69 """Event handler for recording resolving of answers triggering the default70 rule."""71 EVENT_TYPE = feconf.EVENT_TYPE_DEFAULT_ANSWER_RESOLVED72 @classmethod73 def _handle_event(cls, exploration_id, state_name, handler_name, answers):74 """Resolves a list of answers for the default rule of this state."""75 # TODO(sll): Escape these args?76 stats_models.resolve_answers(77 exploration_id, state_name, handler_name,78 exp_domain.DEFAULT_RULESPEC_STR, answers)79class StartExplorationEventHandler(BaseEventHandler):80 """Event handler for recording exploration start events."""81 EVENT_TYPE = feconf.EVENT_TYPE_START_EXPLORATION82 @classmethod83 def _handle_event(cls, exp_id, exp_version, state_name, session_id,84 params, play_type):85 stats_models.StartExplorationEventLogEntryModel.create(86 exp_id, exp_version, state_name, session_id, params,87 play_type)88class MaybeLeaveExplorationEventHandler(BaseEventHandler):89 """Event handler for recording exploration leave events."""90 EVENT_TYPE = feconf.EVENT_TYPE_MAYBE_LEAVE_EXPLORATION91 @classmethod92 def _handle_event(93 cls, exp_id, exp_version, state_name, session_id, time_spent,94 params, play_type):95 stats_models.MaybeLeaveExplorationEventLogEntryModel.create(96 exp_id, exp_version, state_name, session_id, time_spent,97 params, play_type)98class StateHitEventHandler(BaseEventHandler):99 """Event handler for recording state hit events."""100 EVENT_TYPE = feconf.EVENT_TYPE_STATE_HIT101 # TODO(sll): remove params before sending this event to the jobs taskqueue102 @classmethod103 def _handle_event(104 cls, exp_id, exp_version, state_name, session_id,105 params, play_type):106 stats_models.StateHitEventLogEntryModel.create(107 exp_id, exp_version, state_name, session_id,108 params, play_type)109class ExplorationContentChangeEventHandler(BaseEventHandler):110 """Event handler for receiving exploration change events. This event is111 triggered whenever changes to an exploration's contents or metadata (title,112 blurb etc.) are persisted. This includes when a a new exploration is113 created.114 """115 EVENT_TYPE = feconf.EVENT_TYPE_EXPLORATION_CHANGE116 @classmethod117 def _handle_event(cls, *args, **kwargs):118 pass119class ExplorationStatusChangeEventHandler(BaseEventHandler):120 """Event handler for receiving exploration status change events.121 These events are triggered whenever an exploration is published,122 publicized, unpublished or unpublicized.123 """124 EVENT_TYPE = feconf.EVENT_TYPE_EXPLORATION_STATUS_CHANGE125 @classmethod126 def _handle_event(cls, *args, **kwargs):127 pass128class Registry(object):129 """Registry of event handlers."""130 # Dict mapping event types to their classes.131 _event_types_to_classes = {}132 @classmethod133 def _refresh_registry(cls):134 """Regenerates the event handler registry."""135 cls._event_types_to_classes.clear()136 # Find all subclasses of BaseEventHandler in the current module.137 for obj_name, obj in globals().iteritems():138 if inspect.isclass(obj) and issubclass(obj, BaseEventHandler):139 if obj_name == 'BaseEventHandler':140 continue...

Full Screen

Full Screen

external.py

Source:external.py Github

copy

Full Screen

...11 """Pass off events to external plugins,12 run as independent processes.13 """14 implements(IPlugin, IHackabotPlugin)15 def _handle_event(self, conn, event):16 if 'internal' in event and event['internal']:17 return18 conf = conn.manager.config19 if event['type'] == "command":20 commands = glob("%s/commands/%s" %21 (conf.get('root'), event['command']))22 else:23 commands = glob("%s/hooks/%s/*" %24 (conf.get('root'), event['type']))25 if not commands:26 return27 vars = os.environ.copy()28 for key, val in conf.items():29 vars["HB_%s" % key.upper()] = str(val)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Lemoncheesecake automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful