How to use _send_message method in localstack

Best Python code snippet using localstack_python

marionette.py

Source:marionette.py Github

copy

Full Screen

...33 return self.marionette.find_element(method, target, self.id)34 def find_elements(self, method, target):35 return self.marionette.find_elements(method, target, self.id)36 def get_attribute(self, attribute):37 return self.marionette._send_message('getElementAttribute', 'value', element=self.id, name=attribute)38 def click(self):39 return self.marionette._send_message('clickElement', 'ok', element=self.id)40 def single_tap(self, x=None, y=None):41 return self.marionette._send_message('singleTap', 'ok', element=self.id, x=x, y=y)42 def double_tap(self, x=None, y=None):43 return self.marionette._send_message('doubleTap', 'ok', element=self.id, x=x, y=y)44 def press(self, x=None, y=None):45 return self.marionette._send_message('press', 'value', element=self.id, x=x, y=y)46 def release(self, touch_id, x=None, y=None):47 return self.marionette._send_message('release', 'ok', element=self.id, touchId=touch_id, x=x, y=y)48 def cancel_touch(self, touch_id):49 return self.marionette._send_message('cancelTouch', 'ok', element=self.id, touchId=touch_id)50 @property51 def text(self):52 return self.marionette._send_message('getElementText', 'value', element=self.id)53 def send_keys(self, *string):54 typing = []55 for val in string:56 if isinstance(val, Keys):57 typing.append(val)58 elif isinstance(val, int):59 val = str(val)60 for i in range(len(val)):61 typing.append(val[i])62 else:63 for i in range(len(val)):64 typing.append(val[i])65 return self.marionette._send_message('sendKeysToElement', 'ok', element=self.id, value=typing)66 def clear(self):67 return self.marionette._send_message('clearElement', 'ok', element=self.id)68 def is_selected(self):69 return self.marionette._send_message('isElementSelected', 'value', element=self.id)70 def is_enabled(self):71 return self.marionette._send_message('isElementEnabled', 'value', element=self.id)72 def is_displayed(self):73 return self.marionette._send_message('isElementDisplayed', 'value', element=self.id)74 @property75 def size(self):76 return self.marionette._send_message('getElementSize', 'value', element=self.id)77 @property78 def tag_name(self):79 return self.marionette._send_message('getElementTagName', 'value', element=self.id)80 @property81 def location(self):82 return self.marionette._send_message('getElementPosition', 'value', element=self.id)83class Actions(object):84 def __init__(self, marionette):85 self.action_chain = []86 self.marionette = marionette87 self.current_id = None88 def press(self, element, x=None, y=None):89 element=element.id90 self.action_chain.append(['press', element, x, y])91 return self92 def release(self):93 self.action_chain.append(['release'])94 return self95 def move(self, element):96 element=element.id97 self.action_chain.append(['move', element])98 return self99 def move_by_offset(self, x, y):100 self.action_chain.append(['moveByOffset', x, y])101 return self102 def wait(self, time=None):103 self.action_chain.append(['wait', time])104 return self105 def cancel(self):106 self.action_chain.append(['cancel'])107 return self108 def perform(self):109 self.current_id = self.marionette._send_message('actionChain', 'value', chain=self.action_chain, nextId=self.current_id)110 self.action_chain = []111 return self112class MultiActions(object):113 def __init__(self, marionette):114 self.multi_actions = []115 self.max_length = 0116 self.marionette = marionette117 def add(self, action):118 self.multi_actions.append(action.action_chain)119 if len(action.action_chain) > self.max_length:120 self.max_length = len(action.action_chain)121 return self122 def perform(self):123 return self.marionette._send_message('multiAction', 'ok', value=self.multi_actions, max_length=self.max_length)124class Marionette(object):125 CONTEXT_CHROME = 'chrome'126 CONTEXT_CONTENT = 'content'127 TIMEOUT_SEARCH = 'implicit'128 TIMEOUT_SCRIPT = 'script'129 TIMEOUT_PAGE = 'page load'130 def __init__(self, host='localhost', port=2828, bin=None, profile=None,131 emulator=None, sdcard=None, emulatorBinary=None,132 emulatorImg=None, emulator_res=None, gecko_path=None,133 connectToRunningEmulator=False, homedir=None, baseurl=None,134 noWindow=False, logcat_dir=None, busybox=None, symbols_path=None):135 self.host = host136 self.port = self.local_port = port137 self.bin = bin138 self.instance = None139 self.profile = profile140 self.session = None141 self.window = None142 self.emulator = None143 self.extra_emulators = []144 self.homedir = homedir145 self.baseurl = baseurl146 self.noWindow = noWindow147 self.logcat_dir = logcat_dir148 self._test_name = None149 self.symbols_path = symbols_path150 if bin:151 port = int(self.port)152 if not Marionette.is_port_available(port, host=self.host):153 ex_msg = "%s:%d is unavailable." % (self.host, port)154 raise MarionetteException(message=ex_msg)155 self.instance = GeckoInstance(host=self.host, port=self.port,156 bin=self.bin, profile=self.profile)157 self.instance.start()158 assert(self.wait_for_port())159 if emulator:160 self.emulator = Emulator(homedir=homedir,161 noWindow=self.noWindow,162 logcat_dir=self.logcat_dir,163 arch=emulator,164 sdcard=sdcard,165 emulatorBinary=emulatorBinary,166 userdata=emulatorImg,167 res=emulator_res)168 self.emulator.start()169 self.port = self.emulator.setup_port_forwarding(self.port)170 assert(self.emulator.wait_for_port())171 if connectToRunningEmulator:172 self.emulator = Emulator(homedir=homedir,173 logcat_dir=self.logcat_dir)174 self.emulator.connect()175 self.port = self.emulator.setup_port_forwarding(self.port)176 assert(self.emulator.wait_for_port())177 self.client = MarionetteClient(self.host, self.port)178 if emulator:179 self.emulator.setup(self,180 gecko_path=gecko_path,181 busybox=busybox)182 def __del__(self):183 if self.emulator:184 self.emulator.close()185 if self.instance:186 self.instance.close()187 for qemu in self.extra_emulators:188 qemu.emulator.close()189 @staticmethod190 def is_port_available(port, host=''):191 port = int(port)192 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)193 try:194 s.bind((host, port))195 return True196 except socket.error:197 return False198 finally:199 s.close()200 @classmethod201 def getMarionetteOrExit(cls, *args, **kwargs):202 try:203 m = cls(*args, **kwargs)204 return m205 except InstallGeckoError:206 # Bug 812395 - the process of installing gecko into the emulator207 # and then restarting B2G tickles some bug in the emulator/b2g208 # that intermittently causes B2G to fail to restart. To work209 # around this in TBPL runs, we will fail gracefully from this210 # error so that the mozharness script can try the run again.211 # This string will get caught by mozharness and will cause it212 # to retry the tests.213 print "Error installing gecko!"214 # Exit without a normal exception to prevent mozharness from215 # flagging the error.216 sys.exit()217 def wait_for_port(self, timeout=3000):218 starttime = datetime.datetime.now()219 while datetime.datetime.now() - starttime < datetime.timedelta(seconds=timeout):220 try:221 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)222 sock.connect((self.host, self.port))223 data = sock.recv(16)224 sock.close()225 if '"from"' in data:226 time.sleep(5)227 return True228 except socket.error:229 pass230 time.sleep(1)231 return False232 def _send_message(self, command, response_key, **kwargs):233 if not self.session and command not in ('newSession', 'getStatus'):234 raise MarionetteException(message="Please start a session")235 message = { 'type': command }236 if self.session:237 message['session'] = self.session238 if kwargs:239 message.update(kwargs)240 try:241 response = self.client.send(message)242 except socket.timeout:243 self.session = None244 self.window = None245 self.client.close()246 raise TimeoutException(message='socket.timeout', status=ErrorCodes.TIMEOUT, stacktrace=None)247 # Process any emulator commands that are sent from a script248 # while it's executing.249 while response.get("emulator_cmd"):250 response = self._handle_emulator_cmd(response)251 if (response_key == 'ok' and response.get('ok') == True) or response_key in response:252 return response[response_key]253 else:254 self._handle_error(response)255 def _handle_emulator_cmd(self, response):256 cmd = response.get("emulator_cmd")257 if not cmd or not self.emulator:258 raise MarionetteException(message="No emulator in this test to run "259 "command against.")260 cmd = cmd.encode("ascii")261 result = self.emulator._run_telnet(cmd)262 return self.client.send({"type": "emulatorCmdResult",263 "id": response.get("id"),264 "result": result})265 def _handle_error(self, response):266 if 'error' in response and isinstance(response['error'], dict):267 status = response['error'].get('status', 500)268 message = response['error'].get('message')269 stacktrace = response['error'].get('stacktrace')270 # status numbers come from 271 # http://code.google.com/p/selenium/wiki/JsonWireProtocol#Response_Status_Codes272 if status == ErrorCodes.NO_SUCH_ELEMENT:273 raise NoSuchElementException(message=message, status=status, stacktrace=stacktrace)274 elif status == ErrorCodes.NO_SUCH_FRAME:275 raise NoSuchFrameException(message=message, status=status, stacktrace=stacktrace)276 elif status == ErrorCodes.STALE_ELEMENT_REFERENCE:277 raise StaleElementException(message=message, status=status, stacktrace=stacktrace)278 elif status == ErrorCodes.ELEMENT_NOT_VISIBLE:279 raise ElementNotVisibleException(message=message, status=status, stacktrace=stacktrace)280 elif status == ErrorCodes.INVALID_ELEMENT_STATE:281 raise InvalidElementStateException(message=message, status=status, stacktrace=stacktrace)282 elif status == ErrorCodes.UNKNOWN_ERROR:283 raise MarionetteException(message=message, status=status, stacktrace=stacktrace)284 elif status == ErrorCodes.ELEMENT_IS_NOT_SELECTABLE:285 raise ElementNotSelectableException(message=message, status=status, stacktrace=stacktrace)286 elif status == ErrorCodes.JAVASCRIPT_ERROR:287 raise JavascriptException(message=message, status=status, stacktrace=stacktrace)288 elif status == ErrorCodes.XPATH_LOOKUP_ERROR:289 raise XPathLookupException(message=message, status=status, stacktrace=stacktrace)290 elif status == ErrorCodes.TIMEOUT:291 raise TimeoutException(message=message, status=status, stacktrace=stacktrace)292 elif status == ErrorCodes.NO_SUCH_WINDOW:293 raise NoSuchWindowException(message=message, status=status, stacktrace=stacktrace)294 elif status == ErrorCodes.INVALID_COOKIE_DOMAIN:295 raise InvalidCookieDomainException(message=message, status=status, stacktrace=stacktrace)296 elif status == ErrorCodes.UNABLE_TO_SET_COOKIE:297 raise UnableToSetCookieException(message=message, status=status, stacktrace=stacktrace)298 elif status == ErrorCodes.NO_ALERT_OPEN:299 raise NoAlertPresentException(message=message, status=status, stacktrace=stacktrace)300 elif status == ErrorCodes.SCRIPT_TIMEOUT:301 raise ScriptTimeoutException(message=message, status=status, stacktrace=stacktrace)302 elif status == ErrorCodes.INVALID_SELECTOR \303 or status == ErrorCodes.INVALID_XPATH_SELECTOR \304 or status == ErrorCodes.INVALID_XPATH_SELECTOR_RETURN_TYPER:305 raise InvalidSelectorException(message=message, status=status, stacktrace=stacktrace)306 elif status == ErrorCodes.MOVE_TARGET_OUT_OF_BOUNDS:307 MoveTargetOutOfBoundsException(message=message, status=status, stacktrace=stacktrace)308 else:309 raise MarionetteException(message=message, status=status, stacktrace=stacktrace)310 raise MarionetteException(message=response, status=500)311 def check_for_crash(self):312 returncode = None313 name = None314 crashed = False315 if self.emulator:316 if self.emulator.check_for_crash():317 returncode = self.emulator.proc.returncode318 name = 'emulator'319 crashed = True320 if self.symbols_path and self.emulator.check_for_minidumps(self.symbols_path):321 crashed = True322 elif self.instance:323 # In the future, a check for crashed Firefox processes324 # should be here.325 pass326 if returncode is not None:327 print ('PROCESS-CRASH | %s | abnormal termination with exit code %d' %328 (name, returncode))329 return crashed330 def absolute_url(self, relative_url):331 return "%s%s" % (self.baseurl, relative_url)332 def status(self):333 return self._send_message('getStatus', 'value')334 def start_session(self, desired_capabilities=None):335 try:336 # We are ignoring desired_capabilities, at least for now.337 self.session = self._send_message('newSession', 'value')338 except:339 traceback.print_exc()340 self.check_for_crash()341 sys.exit()342 self.b2g = 'b2g' in self.session343 return self.session344 @property345 def test_name(self):346 return self._test_name347 @test_name.setter348 def test_name(self, test_name):349 if self._send_message('setTestName', 'ok', value=test_name):350 self._test_name = test_name351 def delete_session(self):352 response = self._send_message('deleteSession', 'ok')353 self.session = None354 self.window = None355 self.client.close()356 return response357 @property358 def session_capabilities(self):359 response = self._send_message('getSessionCapabilities', 'value')360 return response361 def set_script_timeout(self, timeout):362 response = self._send_message('setScriptTimeout', 'ok', value=timeout)363 return response364 def set_search_timeout(self, timeout):365 response = self._send_message('setSearchTimeout', 'ok', value=timeout)366 return response367 @property368 def current_window_handle(self):369 self.window = self._send_message('getWindow', 'value')370 return self.window371 @property372 def title(self):373 response = self._send_message('getTitle', 'value') 374 return response375 @property376 def window_handles(self):377 response = self._send_message('getWindows', 'value')378 return response379 @property380 def page_source(self):381 response = self._send_message('getPageSource', 'value')382 return response383 def close(self, window_id=None):384 if not window_id:385 window_id = self.current_window_handle386 response = self._send_message('closeWindow', 'ok', value=window_id)387 return response388 def set_context(self, context):389 assert(context == self.CONTEXT_CHROME or context == self.CONTEXT_CONTENT)390 return self._send_message('setContext', 'ok', value=context)391 def switch_to_window(self, window_id):392 response = self._send_message('switchToWindow', 'ok', value=window_id)393 self.window = window_id394 return response395 def switch_to_frame(self, frame=None, focus=True):396 if isinstance(frame, HTMLElement):397 response = self._send_message('switchToFrame', 'ok', element=frame.id, focus=focus)398 else:399 response = self._send_message('switchToFrame', 'ok', value=frame, focus=focus)400 return response401 def get_url(self):402 response = self._send_message('getUrl', 'value')403 return response404 def navigate(self, url):405 response = self._send_message('goUrl', 'ok', value=url)406 return response407 def timeouts(self, timeout_type, ms):408 assert(timeout_type == self.TIMEOUT_SEARCH or timeout_type == self.TIMEOUT_SCRIPT or timeout_type == self.TIMEOUT_PAGE)409 response = self._send_message('timeouts', 'ok', timeoutType=timeout_type, ms=ms)410 return response411 def go_back(self):412 response = self._send_message('goBack', 'ok')413 return response414 def go_forward(self):415 response = self._send_message('goForward', 'ok')416 return response417 def refresh(self):418 response = self._send_message('refresh', 'ok')419 return response420 def wrapArguments(self, args):421 if isinstance(args, list):422 wrapped = []423 for arg in args:424 wrapped.append(self.wrapArguments(arg))425 elif isinstance(args, dict):426 wrapped = {}427 for arg in args:428 wrapped[arg] = self.wrapArguments(args[arg])429 elif type(args) == HTMLElement:430 wrapped = {'ELEMENT': args.id }431 elif (isinstance(args, bool) or isinstance(args, basestring) or432 isinstance(args, int) or isinstance(args, float) or args is None):433 wrapped = args434 return wrapped435 def unwrapValue(self, value):436 if isinstance(value, list):437 unwrapped = []438 for item in value:439 unwrapped.append(self.unwrapValue(item))440 elif isinstance(value, dict):441 unwrapped = {}442 for key in value:443 if key == 'ELEMENT':444 unwrapped = HTMLElement(self, value[key])445 else:446 unwrapped[key] = self.unwrapValue(value[key])447 else:448 unwrapped = value449 return unwrapped450 def execute_js_script(self, script, script_args=None, async=True, new_sandbox=True, special_powers=False, script_timeout=None):451 if script_args is None:452 script_args = []453 args = self.wrapArguments(script_args)454 response = self._send_message('executeJSScript',455 'value',456 value=script,457 args=args,458 async=async,459 newSandbox=new_sandbox,460 specialPowers=special_powers, 461 scriptTimeout=script_timeout)462 return self.unwrapValue(response)463 def execute_script(self, script, script_args=None, new_sandbox=True, special_powers=False, script_timeout=None):464 if script_args is None:465 script_args = []466 args = self.wrapArguments(script_args)467 response = self._send_message('executeScript',468 'value',469 value=script,470 args=args,471 newSandbox=new_sandbox,472 specialPowers=special_powers,473 scriptTimeout=script_timeout)474 return self.unwrapValue(response)475 def execute_async_script(self, script, script_args=None, new_sandbox=True, special_powers=False, script_timeout=None):476 if script_args is None:477 script_args = []478 args = self.wrapArguments(script_args)479 response = self._send_message('executeAsyncScript',480 'value',481 value=script,482 args=args,483 newSandbox=new_sandbox,484 specialPowers=special_powers,485 scriptTimeout=script_timeout)486 return self.unwrapValue(response)487 def find_element(self, method, target, id=None):488 kwargs = { 'value': target, 'using': method }489 if id:490 kwargs['element'] = id491 response = self._send_message('findElement', 'value', **kwargs)492 element = HTMLElement(self, response)493 return element494 def find_elements(self, method, target, id=None):495 kwargs = { 'value': target, 'using': method }496 if id:497 kwargs['element'] = id498 response = self._send_message('findElements', 'value', **kwargs)499 assert(isinstance(response, list))500 elements = []501 for x in response:502 elements.append(HTMLElement(self, x))503 return elements504 def get_active_element(self):505 response = self._send_message('getActiveElement', 'value')506 return HTMLElement(self, response)507 def log(self, msg, level=None):508 return self._send_message('log', 'ok', value=msg, level=level)509 def get_logs(self):510 return self._send_message('getLogs', 'value')511 def add_perf_data(self, suite, name, value):512 return self._send_message('addPerfData', 'ok', suite=suite, name=name, value=value)513 def get_perf_data(self):514 return self._send_message('getPerfData', 'value')515 def import_script(self, js_file):516 js = ''517 with open(js_file, 'r') as f:518 js = f.read()519 return self._send_message('importScript', 'ok', script=js)520 def add_cookie(self, cookie):521 """522 Adds a cookie to your current session.523 :Args:524 - cookie_dict: A dictionary object, with required keys - "name" and "value";525 optional keys - "path", "domain", "secure", "expiry"526 Usage:527 driver.add_cookie({'name' : 'foo', 'value' : 'bar'})528 driver.add_cookie({'name' : 'foo', 'value' : 'bar', 'path' : '/'})529 driver.add_cookie({'name' : 'foo', 'value' : 'bar', 'path' : '/',530 'secure':True})531 """532 return self._send_message('addCookie', 'ok', cookie=cookie)533 def delete_all_cookies(self):534 """535 Delete all cookies in the scope of the session.536 :Usage:537 driver.delete_all_cookies()538 """539 return self._send_message('deleteAllCookies', 'ok')540 def delete_cookie(self, name):541 """542 Delete a cookie by its name543 :Usage:544 driver.delete_cookie('foo')545 """546 return self._send_message('deleteCookie', 'ok', name=name);547 def get_cookie(self, name):548 """549 Get a single cookie by name. Returns the cookie if found, None if not.550 :Usage:551 driver.get_cookie('my_cookie')552 """553 cookies = self.get_cookies()554 for cookie in cookies:555 if cookie['name'] == name:556 return cookie557 return None558 def get_cookies(self):559 return self._send_message("getAllCookies", "value")560 @property561 def application_cache(self):562 return ApplicationCache(self)563 def screenshot(self, element=None, highlights=None):564 if element is not None:565 element = element.id...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful