How to use onOpen method in ghostjs

Best JavaScript code snippet using ghostjs

models.py

Source:models.py Github

copy

Full Screen

1# flake8: noqa2from django.core.servers.basehttp import FileWrapper3from django.http import HttpResponse, Http4044from envirocon.game.installed_games import InstalledGames, GameInterface5from funding_interventions import funding_points, funding_autoshocks6import os.path7fileroot = os.path.abspath(".") + "/game_many/files/"8class ConflictAssessment(GameInterface):9 def pages(self):10 return ('index', 'page2')11 def template(self, page_id=None, public_state=None):12 game_context = {'sampledata': "hello"} # , documents:documents}13 if page_id == "country_narrative":14 return('file', servefile("ConflictAssessment_CountryNarrative.pdf",15 "Country_Narrative.pdf"))16 if page_id == "page2":17 return ('game_many/conflict_assessment.html', game_context)18 return ('game_many/conflict_assessment_intro.html', game_context)19 def variables(self, page_id=None):20 return ['conflict_assessment']21 def resources(self, game_state, onopen=False, onclosed=False):22 if onopen:23 return [{"page_id": 'country_narrative',24 "type": 'file',25 "title": 'Country Narrative.pdf',26 }]27 else:28 return []29InstalledGames.register_game('conflict_assessment',30 'Conflict Assessment',31 ConflictAssessment())32class ExplainYourReportSelection(GameInterface):33 """34 Week 1, Explain Your Report Selection35 """36 def pages(self):37 return ('index', 'page2')38 def template(self, page_id=None, public_state=None):39 game_context = {'sampledata': "hello"}40 if page_id == "page2":41 return ('game_many/explain_your_report_selection.html',42 game_context)43 return ('game_many/explain_your_report_selection_intro.html',44 game_context)45 def variables(self, page_id=None):46 return ['explain_your_report_selection'] # ,'additional_information']47InstalledGames.register_game('explain_your_report_selection',48 'Explain Your Report Selection',49 ExplainYourReportSelection())50class RecommendingInterventions(GameInterface):51 """52 Week 3, Recommending Interventions53 """54 def pages(self):55 return ('index', 'page2')56 def template(self, page_id=None, public_state=None):57 game_context = {'sampledata': "hello"}58 if page_id == "watching_brief":59 return(60 'file',61 servefile("RecommendingInterventions_FirstWatchingBrief.pdf",62 "First_Watching_Brief.pdf"))63 if page_id == "page2":64 return ('game_many/recommending_interventions.html',65 game_context)66 return ('game_many/recommending_interventions_intro.html',67 game_context)68 def variables(self, page_id=None):69 return ['recommending_interventions']70 def resources(self, game_state, onopen=False, onclosed=False):71 if onopen:72 return [{"page_id": 'watching_brief',73 "type": 'file',74 "title": 'First Watching Brief.pdf',75 }]76 else:77 return []78InstalledGames.register_game('recommending_interventions',79 'Recommending Interventions',80 RecommendingInterventions())81class FundingInterventions(GameInterface):82 """83 Week 4, Funding Interventions84 """85 def pages(self):86 return ('index', 'page2')87 def template(self, page_id=None, public_state=None):88 game_context = {'sampledata': "hello"}89 if page_id == "page2":90 return ('game_many/funding_interventions.html', game_context)91 return ('game_many/funding_interventions_intro.html', game_context)92 def variables(self, page_id=None):93 return ['funding_interventions']94 def public_variables(self):95 return ['funding_interventions']96 def consequences(self, game_state):97 return DonorsConference().resources(game_state, onopen=True)98InstalledGames.register_game('funding_interventions',99 'Funding Interventions',100 FundingInterventions())101class TrackingYourProjects(GameInterface):102 """103 Week 4, Tracking Your Projects104 """105 def pages(self):106 return ('index', 'page2',)107 def template(self, page_id=None, public_state=None):108 funded = {}109 if public_state['resources_by_app'].has_key('tracking_your_projects'):110 if public_state['resources_by_app'][111 'tracking_your_projects'].has_key('funded'):112 funded = public_state['resources_by_app'][113 'tracking_your_projects']['funded']['value']114 game_context = {'funded': funded}115 if page_id == "page2":116 return ('game_many/tracking_your_projects.html', game_context)117 return ('game_many/tracking_your_projects_intro.html', game_context)118 def variables(self, page_id=None):119 return ['tracking_your_projects']120 def resources(self, game_state, onopen=False, onclosed=False):121 if onopen and game_state.has_key('funding_interventions'):122 funded = game_state['funding_interventions']123 return [{"page_id": 'funded',124 "type": 'data',125 'value': funded.keys()}]126 def autoshock(self, game_state):127 # to appear in Results Framework128 return funding_autoshocks('funding_interventions',129 game_state['funding_interventions'])130InstalledGames.register_game('tracking_your_projects',131 'Tracking Your Projects',132 TrackingYourProjects())133class ResultsFramework(GameInterface):134 """135 Week 5, Results Framework136 """137 def pages(self):138 return ('index', 'page2',)139 def template(self, page_id=None, public_state=None):140 game_context = {'sampledata': "hello"}141 if page_id == "overview":142 return('file', servefile("ResultsFramework_Overview.pdf",143 "Results_Framework_Overview.pdf"))144 if page_id == "matrices":145 return(146 'file',147 servefile("ResultsFramework_ClusterMatrices.xls",148 "Results_Framework_Cluster_Matrices.xls",149 "excel/ms-excel"))150 if page_id == "page2":151 return ('game_many/results_framework.html', game_context)152 return ('game_many/results_framework_intro.html', game_context)153 def variables(self, page_id=None):154 return ['results_framework']155 def resources(self, game_state, onopen=False, onclosed=False):156 if onopen:157 return [{"page_id": 'overview',158 "type": 'file',159 "title": 'Results Framework Overview.pdf',160 },161 {"page_id": 'matrices',162 "type": 'file',163 "title": 'Cluster Matrices.xls'}164 ]165 else:166 return []167InstalledGames.register_game('results_framework',168 'Results Framework',169 ResultsFramework())170class DonorsConference(GameInterface):171 """172 Week 6, Donors Conference & More Interventions173 """174 def pages(self):175 return ('index', 'page2',)176 def template(self, page_id=None, public_state=None):177 points = 0178 ineligible = {}179 if public_state['resources_by_app'].has_key('donors_conference'):180 if public_state['resources_by_app']['donors_conference'].has_key(181 'week4points'):182 points = public_state['resources_by_app'][183 'donors_conference']['week4points']['value']184 if public_state['resources_by_app']['donors_conference'].has_key(185 'ineligible'):186 ineligible = public_state['resources_by_app'][187 'donors_conference']['ineligible']['value']188 #, 'wb2':wb2}189 game_context = {'week4points': points, 'ineligible': ineligible}190 if page_id == "summary":191 return('file',192 servefile("DonorsConference_DonorsConferenceSummary.pdf",193 "Donors_Conference_Summary.pdf"))194 if page_id == "watching_brief_1":195 return('file', servefile("DonorsConference_WatchingBrief1.pdf",196 "Second_Watching_Brief.pdf"))197 if page_id == "watching_brief_2":198 return('file', servefile("DonorsConference_WatchingBrief2.pdf",199 "Second_Watching_Brief.pdf"))200 if page_id == "watching_brief_3":201 return('file', servefile("DonorsConference_WatchingBrief3.pdf",202 "Second_Watching_Brief.pdf"))203 if page_id == "page2":204 return ('game_many/donors_conference.html', game_context)205 return ('game_many/donors_conference_intro.html', game_context)206 def variables(self, page_id=None):207 return ['donors_conference']208 def public_variables(self):209 return ['donors_conference']210 def resources(self, game_state, onopen=False, onclosed=False):211 if onopen and game_state.has_key('funding_interventions'):212 funded = game_state['funding_interventions']213 # certain interventions, if funded in week 4, are ineligible to be214 # funded in week 6215 ineligible = [intervention for intervention in funded216 if intervention in ['timber-low',217 'nomadic-low',218 'agricultural-low',219 'desertification-low',220 'habitat-low',221 'water-low', ]]222 # teams who choose the 3-point option in any of the first 3223 # categories automatically get wb2224 wb2_choices = [intervention for intervention in funded225 if intervention in ['water-supply-low',226 'sanitation-low',227 'waste-low']]228 autowb2 = False229 if wb2_choices != []:230 autowb2 = True231 unlock_wb2 = False232 points = funding_points(funded)233 if autowb2 or (points > 17 and points < 30):234 wb = {"page_id": 'watching_brief_2', "type":235 'file', "title": 'Second Watching Brief.pdf'}236 elif points < 17:237 wb = {"page_id": 'watching_brief_1', "type":238 'file', "title": 'Second Watching Brief.pdf'}239 else: # 30 or more points240 wb = {"page_id": 'watching_brief_3', "type":241 'file', "title": 'Second Watching Brief.pdf'}242 # yes, the third watching brief unlocks the WB2 map layers.243 # confusing but correct.244 unlock_wb2 = True245 return [{"page_id": 'summary',246 "type": 'file',247 "title": 'Donors Conference Summary.pdf',248 },249 wb,250 {"page_id": 'week4points',251 "type": 'data', 'value': points},252 {"page_id": 'ineligible', "type":253 'data', "value": ineligible},254 {"page_id": 'wb2', "type": 'data', "value": unlock_wb2},255 ]256 else:257 return []258 def consequences(self, game_state):259 return FinalPaper().resources(game_state, onopen=True)260 def autoshock(self, game_state):261 return funding_autoshocks('donors_conference',262 game_state['donors_conference'])263InstalledGames.register_game('donors_conference',264 'Donors Conference',265 DonorsConference())266class FinalPaper(GameInterface):267 """268 Week 8, Final Paper269 """270 def pages(self):271 return ('index', 'page2', 'page3')272 def template(self, page_id=None, public_state=None):273 game_context = {}274 if page_id == "watching_brief_1":275 return('file', servefile("FinalPaper_WatchingBrief1.pdf",276 "Third_Watching_Brief.pdf"))277 if page_id == "watching_brief_2":278 return('file', servefile("FinalPaper_WatchingBrief2.pdf",279 "Third_Watching_Brief.pdf"))280 if page_id == "watching_brief_3":281 return('file', servefile("FinalPaper_WatchingBrief3.pdf",282 "Third_Watching_Brief.pdf"))283 if page_id == "page2":284 return ('game_many/final_paper.html', game_context)285 if page_id == "page3":286 return ('game_many/final_paper2.html', game_context)287 return ('game_many/final_paper_intro.html', game_context)288 def variables(self, page_id=None):289 return ['final_paper']290 def public_variables(self, page_id=None):291 return ['final_paper']292 def resources(self, game_state, onopen=False, onclosed=False):293 if onopen and game_state.has_key('donors_conference'):294 # week 4 points carry over295 week4points = 0296 if game_state.has_key('funding_interventions'):297 week4points = funding_points(298 game_state['funding_interventions'])299 week6points = funding_points(game_state['donors_conference'], 6)300 points = week4points + week6points301 # print points302 # teams who choose the 3-point option in any of the first 3303 # categories automatically get wb2304 funded = game_state['donors_conference']305 wb2_choices = [intervention for intervention in funded306 if intervention in ['water-supply-low',307 'sanitation-low',308 'waste-low']]309 autowb2 = False310 if wb2_choices != []:311 autowb2 = True312 unlock_wb3 = False313 if autowb2 or (points > 32 and points < 59):314 wb = {"page_id": 'watching_brief_2', "type":315 'file', "title": 'Third Watching Brief.pdf'}316 elif points < 33:317 wb = {"page_id": 'watching_brief_1', "type":318 'file', "title": 'Third Watching Brief.pdf'}319 else: # points > 58320 wb = {"page_id": 'watching_brief_3', "type":321 'file', "title": 'Third Watching Brief.pdf'}322 unlock_wb3 = True323 return [wb,324 {"page_id": 'wb3', "type": 'data', "value": unlock_wb3},325 ]326 else:327 return []328InstalledGames.register_game('final_paper',329 'Final Paper',330 FinalPaper())331# helper function ##332def servefile(filename, name, mimetype="application/pdf"):333 # instead of serving the file directly, open it and pipe it over (for334 # security)335 path = InstalledGames.absolute_path("envirocon.game_many",336 "files/%s" % filename)337 # return 404 if the file DNE338 try:339 file_handle = open(path, "rb")340 except:341 raise Http404 # TODO this doesn't do what i expected342 response = HttpResponse(FileWrapper(file_handle), mimetype=mimetype)343 response['Content-Disposition'] = 'attachment; filename=%s' % name344 response['Content-Length'] = os.path.getsize(path)...

Full Screen

Full Screen

Notifier_test.py

Source:Notifier_test.py Github

copy

Full Screen

1import pytest2from unittest.mock import Mock34import sys5# insert at 1, 0 is the script path (or '' in REPL)6if not '../event-notifier' in sys.path:7 sys.path.insert(1, '../event-notifier')8from EventNotifier.Notifier import Notifier91011@pytest.fixture(scope="class") # scope="function" is default12def logger():13 import logging1415 l = logging.getLogger("multiprocessing")16 h = logging.StreamHandler()17 f = logging.Formatter("%(asctime)s: %(message)s")18 h.setFormatter(f)19 l.addHandler(h)20 l.setLevel(logging.ERROR)21 yield l222324@pytest.fixture(scope="class") # scope="function" is default25def notifier(logger):26 return Notifier(["onCreate", "onOpen", "onModify", "onClose", "onDelete"], logger)272829@pytest.fixture(scope="class") # scope="function" is default30def emptyNotifier(logger):31 return Notifier([], logger)323334@pytest.fixture(scope="class") # scope="function" is default35def notifierWithoutLogger(logger):36 return Notifier(["onCreate", "onOpen", "onModify", "onDelete"])373839class TestNotifier:40 def test_initialised_OK(self, notifier):41 assert 5 == len(notifier.get_registered_events())424344 def test_initiallyEmpty_OK(self, emptyNotifier):45 assert 0 == len(emptyNotifier.get_registered_events())464748 def test_initialisedWithoutLogger_OK(self, notifierWithoutLogger):49 assert 4 == len(notifierWithoutLogger.get_registered_events())505152 def test_initialisedWithRepeatedEventNames_Raises(self, notifierWithoutLogger):53 with pytest.raises(KeyError):54 Notifier(["onCreate", "onOpen", "onModify", "onDelete", "onOpen"])555657 def test_supportedEventsAccessible_OK(self, notifierWithoutLogger):58 notifier = Notifier(["onCreate", "onOpen", "onModify", "onDelete", "onForget"])59 assert ["onCreate", "onOpen", "onModify", "onDelete", "onForget"] == notifier.get_registered_events()606162 def test_fireEvent_OK(self, notifierWithoutLogger):63 onOpenCallback = Mock()6465 notifierWithoutLogger.subscribe("onOpen", onOpenCallback)6667 onOpenCallback.assert_not_called()68 notifierWithoutLogger.raise_event("onOpen", 77)69 onOpenCallback.assert_called_once_with(77)707172 def test_fireEventWithNoneAsEventName_Raises(self, notifierWithoutLogger):73 onOpenCallback = Mock()7475 notifierWithoutLogger.subscribe("onOpen", onOpenCallback)7677 onOpenCallback.assert_not_called()78 with pytest.raises(KeyError):79 notifierWithoutLogger.raise_event(None, "event info")80 onOpenCallback.assert_not_called()818283 def test_fireEventWithNoneAsParameter_OK(self, notifierWithoutLogger):84 onOpenCallback = Mock()8586 notifierWithoutLogger.subscribe("onOpen", onOpenCallback)8788 onOpenCallback.assert_not_called()89 notifierWithoutLogger.raise_event("onOpen", None)90 onOpenCallback.assert_called_once_with(None)91 notifierWithoutLogger.raise_event("onOpen", None, None, None)92 onOpenCallback.assert_called_with(None, None, None)93 assert(2 == onOpenCallback.call_count)949596 def test_fireEventOnlyCorrectIsFired_OK(self, notifierWithoutLogger):97 onOpenCallback = Mock()98 onDeleteCallback = Mock()99100 notifierWithoutLogger.subscribe("onOpen", onOpenCallback)101102 onOpenCallback.assert_not_called()103 onDeleteCallback.assert_not_called()104105 notifierWithoutLogger.raise_event("onDelete", 258)106 notifierWithoutLogger.subscribe("onDelete", onDeleteCallback)107108 onOpenCallback.assert_not_called()109 onDeleteCallback.assert_not_called()110111 notifierWithoutLogger.raise_event("onOpen", 99)112 onOpenCallback.assert_called_once_with(99)113 onDeleteCallback.assert_not_called()114115 notifierWithoutLogger.raise_event("onDelete", "string as event")116 onOpenCallback.assert_called_once_with(99)117 onDeleteCallback.assert_called_once_with("string as event")118119120 def test_removedSubscriberIsNotFired_OK(self, notifierWithoutLogger):121 onCreateCallback = Mock()122 onModifyCallback = Mock()123124 notifierWithoutLogger.subscribe("onCreate", onCreateCallback)125 notifierWithoutLogger.subscribe("onModify", onModifyCallback)126127 onCreateCallback.assert_not_called()128 onModifyCallback.assert_not_called()129130 notifierWithoutLogger.raise_event("onCreate", "event: test_removedSubscriberIsNotFired_OK - onCreate")131 onCreateCallback.assert_called_once_with("event: test_removedSubscriberIsNotFired_OK - onCreate")132 onModifyCallback.assert_not_called()133134 notifierWithoutLogger.remove_subscribers_by_event_name("onCreate")135 notifierWithoutLogger.raise_event("onCreate", "event: second time")136137 onCreateCallback.assert_called_once_with("event: test_removedSubscriberIsNotFired_OK - onCreate")138 onModifyCallback.assert_not_called()139140 notifierWithoutLogger.raise_event("onModify", "onModify is still working...")141 onCreateCallback.assert_called_once_with("event: test_removedSubscriberIsNotFired_OK - onCreate")142 onModifyCallback.assert_called_once_with("onModify is still working...")143144145 def test_allRemovedNotFiredToo_OK(self, notifierWithoutLogger):146 onOpenCallback = Mock()147 onDeleteCallback = Mock()148149 notifierWithoutLogger.subscribe("onOpen", onOpenCallback)150 notifierWithoutLogger.subscribe("onDelete", onDeleteCallback)151152 onOpenCallback.assert_not_called()153 onDeleteCallback.assert_not_called()154155 notifierWithoutLogger.raise_event("onOpen", "event: test_removedSubscriberIsNotFired_OK - onOpen")156 onOpenCallback.assert_called_once_with("event: test_removedSubscriberIsNotFired_OK - onOpen")157 onDeleteCallback.assert_not_called()158159 notifierWithoutLogger.remove_all_subscribers()160161 notifierWithoutLogger.raise_event("onOpen", "event: second time")162163 onOpenCallback.assert_called_once_with("event: test_removedSubscriberIsNotFired_OK - onOpen")164 onDeleteCallback.assert_not_called()165166167 def test_repeatedlyAddingSameSubscriber_OK(self, notifierWithoutLogger):168 onCreateCallback = Mock()169 notifierWithoutLogger.subscribe("onCreate", onCreateCallback)170 notifierWithoutLogger.subscribe("onCreate", onCreateCallback)171 notifierWithoutLogger.subscribe("onCreate", onCreateCallback)172173 onCreateCallback.assert_not_called()174175 notifierWithoutLogger.raise_event("onCreate", "event: onCreate 2222")176 onCreateCallback.assert_called_once_with("event: onCreate 2222")177178179 #TODO: refactor this test to use parmetrised annotation180 def test_anyTypeAsEvent_OK(self):181 class Box:182 def __init__(self, name):183 self.name = name184185 a = Box("keyBoxA")186 b = Box("keyBoxB")187 c = Box("keyBoxB") # intentionally named this way to look like b188189 notifier = Notifier(["onCreate", 5, 22.58, "onDelete", a, b])190 onCreateCallback = Mock()191 on5Callback = Mock()192 onFloatCallback = Mock()193 onBoxACallback = Mock()194 onBoxBCallback = Mock()195196 notifier.subscribe("onCreate", onCreateCallback)197 notifier.subscribe(5, on5Callback)198 notifier.subscribe(22.58, onFloatCallback)199 notifier.subscribe(a, onBoxACallback)200 notifier.subscribe(b, onBoxBCallback)201202 onCreateCallback.assert_not_called()203 on5Callback.assert_not_called()204 onFloatCallback.assert_not_called()205 onBoxACallback.assert_not_called()206 onBoxBCallback.assert_not_called()207208 notifier.raise_event("onCreate", "event: onCreate !!!!")209 onCreateCallback.assert_called_once_with("event: onCreate !!!!")210 on5Callback.assert_not_called()211 onFloatCallback.assert_not_called()212 onBoxACallback.assert_not_called()213 onBoxBCallback.assert_not_called()214215 with pytest.raises(KeyError):216 notifier.raise_event(6, "event: !!!!! 6 !!!!") # unknown event - KeyError raised217 onCreateCallback.assert_called_once_with("event: onCreate !!!!")218 on5Callback.assert_not_called()219 onFloatCallback.assert_not_called()220 onBoxACallback.assert_not_called()221 onBoxBCallback.assert_not_called()222223 notifier.raise_event(5, "event: !!!!! 5 !!!!") # this one is registered - OK224 onCreateCallback.assert_called_once_with("event: onCreate !!!!")225 on5Callback.assert_called_once_with("event: !!!!! 5 !!!!")226 onFloatCallback.assert_not_called()227 onBoxACallback.assert_not_called()228 onBoxBCallback.assert_not_called()229230 with pytest.raises(KeyError):231 notifier.raise_event(22.59, "event: !!!!! 22.59 !!!!") # unknown event - KeyError raised232 onCreateCallback.assert_called_once_with("event: onCreate !!!!")233 on5Callback.assert_called_once_with("event: !!!!! 5 !!!!")234 onFloatCallback.assert_not_called()235 onBoxACallback.assert_not_called()236 onBoxBCallback.assert_not_called()237238 notifier.raise_event(22.58, "event: !!!!! 22.58 !!!!") # this one is registered - OK239 onCreateCallback.assert_called_once_with("event: onCreate !!!!")240 on5Callback.assert_called_once_with("event: !!!!! 5 !!!!")241 onFloatCallback.assert_called_once_with("event: !!!!! 22.58 !!!!")242 onBoxACallback.assert_not_called()243 onBoxBCallback.assert_not_called()244245 with pytest.raises(KeyError):246 notifier.raise_event(c, "event: Box c with name like it is b") # unknown event (object is known, name is the same, but object is still different) - KeyError raised247 onCreateCallback.assert_called_once_with("event: onCreate !!!!")248 on5Callback.assert_called_once_with("event: !!!!! 5 !!!!")249 onFloatCallback.assert_called_once_with("event: !!!!! 22.58 !!!!")250 onBoxACallback.assert_not_called()251 onBoxBCallback.assert_not_called()252253 notifier.raise_event(b, "event: Box b")254 onCreateCallback.assert_called_once_with("event: onCreate !!!!")255 on5Callback.assert_called_once_with("event: !!!!! 5 !!!!")256 onFloatCallback.assert_called_once_with("event: !!!!! 22.58 !!!!")257 onBoxACallback.assert_not_called()258 onBoxBCallback.assert_called_once_with("event: Box b")259260261 def test_notifySubscribersWithMultipleParams_OK(self, notifier):262 onModifyCallback = Mock()263264 notifier.subscribe("onModify", onModifyCallback)265266 onModifyCallback.assert_not_called()267268 notifier.raise_event("onModify", "some text", 25, 16.99, named1 ="named param value", named2 = 25)269 onModifyCallback.assert_called_once_with("some text", 25, 16.99, named1 = "named param value", named2 = 25)270271272 def test_subscribeToAll_OK(self, notifier):273 onAnyCallback = Mock()274275 notifier.subscribe_to_all(onAnyCallback)276277 onAnyCallback.assert_not_called()278279 notifier.raise_event("onCreate", "event specific info here", event_type="onCreate")280 onAnyCallback.assert_called_once_with("event specific info here", event_type="onCreate")281282 notifier.raise_event("onOpen", "event specific info22 here", event_type="onOpen")283 onAnyCallback.assert_called_with("event specific info22 here", event_type="onOpen")284 assert (2 == onAnyCallback.call_count)285286 notifier.raise_event("onClose", "event specific info33 here", event_type="onClose")287 onAnyCallback.assert_called_with("event specific info33 here", event_type="onClose")288 assert (3 == onAnyCallback.call_count)289290291 def test_subscribeToAllIfWasAlreadyRegisteredToOne_OK(self, notifier):292 onAnyCallback = Mock()293294 notifier.subscribe("onCreate", onAnyCallback)295 notifier.subscribe_to_all(onAnyCallback)296297 onAnyCallback.assert_not_called()298299 notifier.raise_event("onCreate", "event specific info here", event_type="onCreate")300 onAnyCallback.assert_called_once_with("event specific info here", event_type="onCreate")301302 notifier.raise_event("onOpen", "event specific info22 here", event_type="onOpen")303 onAnyCallback.assert_called_with("event specific info22 here", event_type="onOpen")304 assert (2 == onAnyCallback.call_count) ...

Full Screen

Full Screen

Examples_test.py

Source:Examples_test.py Github

copy

Full Screen

...3940 def test_remove_subscribers_by_event_name(self):41 from EventNotifier import Notifier42 class FileWatchDog():43 def onOpen(self, fileName, openMode):44 print(f"File {fileName} opened with {openMode} mode")4546 def onClose(self, fileName):47 print(f"File {fileName} closed")484950 def onOpenStandaloneMethod(fileName, openMode):51 print(f"StandaloneMethod: File {fileName} opened with {openMode} mode")5253 watchDog = FileWatchDog()5455 notifier = Notifier(["onCreate", "onOpen", "onModify", "onClose", "onDelete"])5657 notifier.subscribe("onOpen", watchDog.onOpen)58 notifier.subscribe("onOpen", onOpenStandaloneMethod)59 notifier.subscribe("onClose", watchDog.onClose)6061 print("\nAfter subscription:")62 notifier.raise_event("onOpen", openMode="w+", fileName="test_file.txt")63 notifier.raise_event("onClose", fileName="test_file.txt")6465 notifier.remove_subscribers_by_event_name("onOpen")6667 print("\nAfter removal of onOpen subscribers:")68 notifier.raise_event("onOpen", openMode="w+", fileName="test_file.txt")69 notifier.raise_event("onClose", fileName="test_file.txt")7071 notifier.remove_subscribers_by_event_name("onClose")7273 print("\nAfter removal of onClose subscribers:")74 notifier.raise_event("onOpen", openMode="w+", fileName="test_file.txt")75 notifier.raise_event("onClose", fileName="test_file.txt")76777879 def test_remove_all_subscribers(self):80 from EventNotifier import Notifier81 class FileWatchDog():82 def onOpen(self, fileName, openMode):83 print(f"File {fileName} opened with {openMode} mode")8485 def onClose(self, fileName):86 print(f"File {fileName} closed")878889 def onOpenStandaloneMethod(fileName, openMode):90 print(f"StandaloneMethod: File {fileName} opened with {openMode} mode")9192 watchDog = FileWatchDog()9394 notifier = Notifier(["onCreate", "onOpen", "onModify", "onClose", "onDelete"])9596 notifier.subscribe("onOpen", watchDog.onOpen) ...

Full Screen

Full Screen

callback.py

Source:callback.py Github

copy

Full Screen

...6 @abstractmethod7 def onTrade(self, data: MarketData):8 '''onTrade'''9 @abstractmethod10 def onOpen(self, data: MarketData):11 '''onOpen'''12 @abstractmethod13 def onFill(self, data: MarketData):14 '''onFill'''15 @abstractmethod16 def onCancel(self, data: MarketData):17 '''onCancel'''18 @abstractmethod19 def onChange(self, data: MarketData):20 '''onChange'''21 @abstractmethod22 def onError(self, data: MarketData):23 '''onError'''24 def onExit(self):25 '''onExit'''26 pass27 def onAnalyze(self, data):28 '''onAnalyze'''29 pass30 def onHalt(self, data):31 '''onHalt'''32 pass33 def onContinue(self, data):34 '''onContinue'''35 pass36 def callback(self):37 return self38class NullCallback(Callback):39 def __init__(self):40 pass41 def onTrade(self, data: MarketData) -> None:42 pass43 def onReceived(self, data: MarketData) -> None:44 pass45 def onOpen(self, data: MarketData) -> None:46 pass47 def onFill(self, data: MarketData) -> None:48 pass49 def onCancel(self, data: MarketData) -> None:50 pass51 def onChange(self, data: MarketData) -> None:52 pass53 def onError(self, data: MarketData) -> None:54 pass55class Print(Callback):56 def __init__(self,57 onTrade=True,58 onReceived=True,59 onOpen=True,60 onFill=True,61 onCancel=True,62 onChange=True,63 onError=True):64 if not onTrade:65 setattr(self, 'onTrade', False)66 if not onReceived:67 setattr(self, 'onReceived', False)68 if not onOpen:69 setattr(self, 'onOpen', False)70 if not onFill:71 setattr(self, 'onFilled', False)72 if not onCancel:73 setattr(self, 'onCancelled', False)74 if not onChange:75 setattr(self, 'onChange', False)76 if not onError:77 setattr(self, 'onError', False)78 def onTrade(self, data: MarketData) -> None:79 dlog.info(str(data))80 def onReceived(self, data: MarketData) -> None:81 dlog.info(str(data))82 def onOpen(self, data: MarketData) -> None:83 dlog.info(str(data))84 def onFill(self, data: MarketData) -> None:85 dlog.info(str(data))86 def onCancel(self, data: MarketData) -> None:87 dlog.info(str(data))88 def onChange(self, data: MarketData) -> None:89 dlog.info(str(data))90 def onError(self, data: MarketData) -> None:91 dlog.info(str(data))92 def onAnalyze(self, data) -> None:93 log.info('Analysis')94 pass95 def onHalt(self, data) -> None:96 log.info('Halt')...

Full Screen

Full Screen

websocket_client_threaded.py

Source:websocket_client_threaded.py Github

copy

Full Screen

1import threading, Queue2from amitu import websocket_client3class _Writer(threading.Thread):4 def __init__(self, ws):5 super(_Writer, self).__init__()6 self.daemon = True7 self.ws = ws8 self.queue = Queue.Queue()9 def send(self, data):10 self.queue.put(data)11 def run(self):12 while True:13 self.ws._send(self.queue.get(block=True))14class WebSocket(websocket_client.WebSocket):15 """16 Threaded WebSocket class17 Use this class to use a threaded websocket. It reads data from server18 on the current thread, and sends data on a separate daemon thread.19 >>> def onmessage(message): print "onmessage", message20 ...21 >>> def onopen(): print "onopen"22 ...23 >>> def onclose(): print "onclose"24 ...25 >>> ws = WebSocket("ws://server.com:8080/path")26 >>> ws.onopen(onopen)27 >>> ws.onclose(onclose)28 >>> ws.onmessage(onmessage)29 >>> ws.run() # blocks30 """31 def __init__(self, *args, **kw):32 websocket_client.WebSocket.__init__(self, *args, **kw)33 self.writer = _Writer(self)34 self.onopen_handlers = []35 self.onclose_handlers = []36 self.onmessage_handlers = []37 def run(self):38 self.writer.start()39 websocket_client.WebSocket.run(self)40 def send(self, data):41 self.writer.send(data)42 def _fire_onopen(self):43 for cb in self.onopen_handlers: cb()44 def _fire_onmessage(self, data):45 for cb in self.onmessage_handlers: cb(data)46 def _fire_onclose(self):47 for cb in self.onclose_handlers: cb()48 def onopen(self, cb): self.onopen_handlers.append(cb)49 def onmessage(self, cb): self.onmessage_handlers.append(cb)50 def onclose(self, cb): self.onclose_handlers.append(cb)51class WebSocketThreaded(WebSocket, threading.Thread):52 """53 WebSocketThreaded54 This is a thread that runs in the background, reading and writing both55 in two different threads.56 >>> def onmessage(message): print "onmessage", message57 ...58 >>> def onopen(): print "onopen"59 ...60 >>> def onclose(): print "onclose"61 ...62 >>> ws = WebSocketThreaded("ws://server.com:8080/path")63 >>> ws.onopen(onopen)64 >>> ws.onclose(onclose)65 >>> ws.onmessage(onmessage)66 >>> ws.start()67 >>> ws.wait()68 """69 def __init__(self, *args, **kw):70 WebSocket.__init__(self, *args, **kw)...

Full Screen

Full Screen

pwave.py

Source:pwave.py Github

copy

Full Screen

1from websocket import *2import websocket, httplib3'''4connect to the socketio server51. perform the HTTP handshake62. open a websocket connection7'''8def connect(self) :9 conn = httplib.HTTPConnection('localhost:8124')10 conn.request('POST','/socket.io/1/')11 resp = conn.getresponse() 12 hskey = resp.read().split(':')[0]13 self._ws = websocket.WebSocket(14 'ws://localhost:8124/socket.io/1/websocket/'+hskey,15 onopen = self._onopen,16 onmessage = self._onmessage)17def my_msg_handler(msg):18 print 'Got "%s"!' % msg19def encode_for_socketio(message):20 """21 Encode 'message' string or dictionary to be able22 to be transported via a Python WebSocket client to 23 a Socket.IO server (which is capable of receiving 24 WebSocket communications). This method taken from 25 gevent-socketio.26 """27 MSG_FRAME = "~m~"28 HEARTBEAT_FRAME = "~h~"29 JSON_FRAME = "~j~"30 if isinstance(message, basestring):31 encoded_msg = message32 elif isinstance(message, (object, dict)):33 return encode_for_socketio(JSON_FRAME + json.dumps(message))34 else:35 raise ValueError("Can't encode message.")36 return MSG_FRAME + str(len(encoded_msg)) + MSG_FRAME + encoded_msg37# socket = websocket.WebSocket('ws://localhost:5000/', onmessage=my_msg_handler)38# self._ws = websocket.WebSocket(39# 'ws://localhost:8124/socket.io/1/websocket/'+hskey,40# onopen = self._onopen,41# onmessage = self._onmessage)42conn = httplib.HTTPConnection('localhost:5000')43conn.request('POST','/socket.io/1/')44resp = conn.getresponse() 45respdata = resp.read()46hskey = respdata.split(':')[0]47print respdata48print hskey49socket = websocket.WebSocket('ws://localhost:5000/socket.io/1/websocket/'+hskey, onmessage=my_msg_handler)50# # socket.onopen = lambda: socket.send(encode_for_socketio('Hello world!'))51senddata = '5:::{"message":"hi"}'52socket.onopen = lambda: socket.send(senddata)53# ws = websocket.create_connection('ws://localhost:5000/data')54# msg = "Hello, world!"55# msg = encode_for_socketio(msg)56# ws.send(msg)57try:58 asyncore.loop()59except KeyboardInterrupt:...

Full Screen

Full Screen

test_atila_websocket.py

Source:test_atila_websocket.py Github

copy

Full Screen

1import skitai2import confutil3import pprint4def test_websocket (app):5 def onopen (was):6 return 'Welcome'7 @app.route ("/echo")8 @app.websocket (skitai.WS_CHANNEL, 60, onopen = onopen)9 def echo (was, message):10 was.websocket.send ('1st: ' + message)11 return "2nd: " + message12 @app.route ("/echo2")13 @app.websocket (skitai.WS_CHANNEL | skitai.WS_NOTHREAD, 60, onopen = onopen)14 def echo2 (was, message):15 was.websocket.send ('1st: ' + message)16 return "2nd: " + message17 @app.route ("/echo3")18 @app.websocket (skitai.WS_CHANNEL | skitai.WS_SESSION, 60)19 def echo3 (was):20 yield '111'21 with app.test_client ("/", confutil.getroot ()) as cli:22 resp = cli.ws ("/echo", "hello")23 assert resp.status_code == 40324 app.access_control_allow_origin = ["*"]25 with app.test_client ("/", confutil.getroot ()) as cli:26 resp = cli.ws ("/echo", "hello")27 assert resp.status_code == 10128 assert resp.content == b'\x81\x07Welcome'29 resp = cli.ws ("/echo2", "hello2")30 assert resp.status_code == 10131 assert resp.content == b'\x81\x07Welcome'32 resp = cli.ws ("/echo3", "hello3")33 assert resp.status_code == 101...

Full Screen

Full Screen

replace.py

Source:replace.py Github

copy

Full Screen

1onOpen = True2file = open("balancedBST.md", "r")3file2 = open("balancedBST(1).md", "w+")4for line in file:5 for char in line:6 if char == "$":7 if onOpen:8 file2.write("{{<katex>}}")9 onOpen = False10 else:11 file2.write("{{</katex>}}")12 onOpen = True13 else:14 file2.write(char) 15file.close()...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1var ghost = require('ghostjs');2 .type('input[name="q"]', 'ghostjs')3 .click('input[value="Google Search"]')4 .waitForUrl(/search/)5 .screenshot('google.png')6 .end();7var phantom = require('phantomjs');8phantom.create(function (ph) {9 ph.createPage(function (page) {10 console.log("opened google? ", status);11 page.evaluate(function () { return document.title; }, function (result) {12 console.log('Page title is ' + result);13 ph.exit();14 });15 });16 });17});18var casper = require('casper').create();19 this.fill('form[action="/search"]', { q: 'casperjs' }, true);20});21casper.then(function() {22 this.capture('google.png');23});24casper.run();25var page = require('webpage').create(),26 system = require('system');27 console.log("opened google? ", status);28 page.render('google.png');29 phantom.exit();30});31var Browser = require("zombie");32var browser = new Browser();33 .fill("q", "zombie.js")34 .pressButton("Google Search", function() {35 browser.wait().then(function() {36 browser.screenshot().then(function(buffer) {37 console.log("screenshot is %d bytes", buffer.length);38 });39 });40 });41});42var jsdom = require("jsdom").jsdom;43var document = jsdom();44var window = document.defaultView;45var $ = require("jquery")(window);46var cheerio = require('cheerio'),47 $ = cheerio.load('<h2 class="title">Hello world</h2>');48$('h2.title').text('Hello there!');

Full Screen

Using AI Code Generation

copy

Full Screen

1var ghost = require('ghostjs');2 .type('#lst-ib', 'ghostjs')3 .click('#_fZl')4 .wait(1000)5 .screenshot('google-search.png')6 .end();7### ghost.open(url)8### ghost.type(selector, text)9### ghost.click(selector)10### ghost.wait(ms)11### ghost.screenshot(path)12### ghost.end()131. Fork it (

Full Screen

Using AI Code Generation

copy

Full Screen

1var ghost = require('ghostjs');2.then(function() {3 return ghost.fill('#lst-ib', 'ghostjs');4})5.then(function() {6 return ghost.click('#_fZl');7})8.then(function() {9 return ghost.wait(5000);10})11.then(function() {12 return ghost.screenshot('google.png');13})14.then(function() {15 return ghost.close();16})17.catch(function(e) {18 console.log(e);19});20var casper = require('casper').create();21 this.fill('form[action="/search"]', { q: 'casperjs' }, true);22});23casper.then(function() {24 this.capture('google.png');25});26casper.run();27var page = require('webpage').create();28 console.log("Status: " + status);29 if(status === "success") {30 page.render('google.png');31 }32 phantom.exit();33});34var webdriver = require('selenium-webdriver');35var driver = new webdriver.Builder()36 .withCapabilities(webdriver.Capabilities.chrome())37 .build();38driver.findElement(webdriver.By.name('q')).sendKeys('webdriver');39driver.findElement(webdriver.By.name('btnG')).click();40driver.wait(function() {41 return driver.getTitle().then(function(title) {42 return title === 'webdriver - Google Search';43 });44}, 1000);45driver.takeScreenshot().then(function(data) {46 require('fs').writeFile('google.png', data, 'base64', function(err) {47 if(err) {48 console.log(err);49 }50 });51});52driver.quit();53module.exports = {54 'Demo test Google' : function (browser) {55 .waitForElementVisible('body', 1000)56 .setValue('input[type=text]', 'nightwatch')57 .waitForElementVisible('button[name=btnG]', 1000)58 .click('button[name=btnG]')59 .pause(1000)60 .assert.containsText('#

Full Screen

Using AI Code Generation

copy

Full Screen

1var ghost = require('ghostjs');2 .type('#username', 'admin')3 .type('#password', 'admin')4 .click('input[type="submit"]')5 .waitFor('#content', function() {6 console.log('Login successful');7 ghost.screenshot('login.png');8 })9 .end();10The MIT License (MIT)

Full Screen

Using AI Code Generation

copy

Full Screen

1const ghost = require('ghostjs');2const assert = require('assert');3ghost.open(url)4.then(() => ghost.waitForPageToLoad())5.then(() => ghost.waitForSelector('#lst-ib'))6.then(() => ghost.type('#lst-ib', 'ghostjs'))7.then(() => ghost.waitForSelector('.sbico-c'))8.then(() => ghost.click('.sbico-c'))9.then(() => ghost.waitForPageToLoad())10.then(() => ghost.waitForSelector('#resultStats'))11.then(() => ghost.getText('#resultStats'))12.then((text) => {13 assert.ok(text.match(/About/));14 console.log('Test passed!');15})16.catch((e) => {17 console.log('Test failed!');18 console.log(e);19});20### open(url)21### close()22### waitForPageToLoad()23### waitForSelector(selector)24### click(selector)25### type(selector, text)26### getText(selector)27### getAttribute(selector, attribute)28### getHtml(selector)29### getOuterHtml(selector)30### getCssProperty(selector, property)

Full Screen

Using AI Code Generation

copy

Full Screen

1var ghost = require('ghostjs');2var path = require('path');3ghost.open(url, function(err, data) {4 ghost.set('viewportSize', {width: 1024, height: 768}, function(err, data) {5 ghost.screenshot(path.join(__dirname, 'google.png'), function(err, data) {6 ghost.exit();7 });8 });9});

Full Screen

Using AI Code Generation

copy

Full Screen

1var ghost = require('ghostjs');2var assert = require('assert');3ghost.open(url, function(err, window){4 if (err) { return console.error(err); }5 window.set('viewportSize', {width: 1024, height: 768}, function(err){6 window.render('google.png', function(err){7 console.log('done');8 window.close();9 });10 });11});

Full Screen

Using AI Code Generation

copy

Full Screen

1var ghost = require('ghostjs');2 console.log('GhostJS is ready to use');3});4### open(url)5### close()6### screenshot(filename)7### waitForSelector(selector, timeout)8### waitForVisible(selector, timeout)9### waitForHidden(selector, timeout)10### waitForText(selector, text, timeout)11### waitForUrl(url, timeout)12### click(selector)13### type(selector, text)14### select(selector, value)15### upload(selector, filename)16### evaluate(fn, ...args)17### injectJs(filename)18### injectCss(filename)

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ghostjs automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful