Best Python code snippet using autotest_python
wax_server.py
Source:wax_server.py  
1#!/usr/bin/python2# Copyright 2013 Google Inc. All Rights Reserved.3"""Implements the Wax Service testing API as a simple web service.4This is a basic implementation of the Wax Service in a form that lends5itself to testing in an open source release since the Wax Service is not6yet publicly accessible. It is implemented as a plain web server rather7than Google Cloud Service, but the HTTP interface is the same. Therefore,8this is suitable for testing basic HTTP interactions and end-to-end testing9for many client features.10Usage:11  wax_server [-g] [--port=<port>] [--signal_pid=<pid>]12     -g allows the server to accept connections from any IP.13     The default is only from localhost.14     --port specifies the port to listen on.15     The default is 5000.16     --signal_pid specifies a PID that should be notified with SIGUSR1 when17       the server is read.18"""19from BaseHTTPServer import BaseHTTPRequestHandler20from BaseHTTPServer import HTTPServer21import copy22import getopt23import json24import os25import re26import signal27import string28import sys29import threading30import time31_RE_SESSION_COMMAND = re.compile('^/sessions/([^/]+)/items')32_RE_ITEM_COMMAND = re.compile('^/sessions/([^/]+)/items/([^/]+)')33_JSON_CONTENT_TYPE = 'application/json'34_SERVER_SETTINGS = {'run': True}35class SessionData(object):36  """Session data maintains a list of objects for a given session id.37  These objects are thread-safe and live over the lifetime of the server.38  They are destroyed by explicitly removing the session, which is not at39  all reliable since clients are not required to do so, and may crash, etc.40  However, this is a testing server that is not expected to be run for long41  periods of time.42  A session contains a list of items where an item is a dictionary.43  Items have a 'kind' attribute to make them compatible with the Wax Service.44  The session is initialized with two items, A and B, to be compatible with45  the Wax Service.46  """47  def __init__(self):48    """Constructs a new session instance."""49    self._created = time.clock()50    self._mutex = threading.Lock()51    self._items = []52    # Add items A and B by default to mimick the service's behavior.53    self._items.append(self.AddWaxKind({'id': 'A', 'name': 'Item A'}))54    self._items.append(self.AddWaxKind({'id': 'B', 'name': 'Item B'}))55  def _FindItem(self, key):56    """Finds item with the given id, if present in the session.57    This method is not thread-safe. It relies on the caller to lock.58    Args:59      key: (string) Identifier to search for.60    Returns:61      A reference to the item or None if one is not present.62    """63    for session_item in self._items:64      if session_item['id'] == key:65        return session_item66    return None67  def AddNewItem(self, key, item):68    """Add an item to the session list if the specified identifier is unique.69    Args:70      key: (string) The item's identifier71      item: (dict) The item to add must have an 'id'' and will be given72                   a 'kind' attribute.73    Returns:74      The actual values for the added item (including added values).75      None indicates a failure (because the identifier already exists)76    """77    wax = None78    self._mutex.acquire()79    if not self._FindItem(key):80      wax = self.AddWaxKind(item)81      self._items.append(copy.deepcopy(wax))82    self._mutex.release()83    return wax84  def ReplaceItem(self, key, item):85    """Inserts the item into the session, or replaces the item for identifer.86    Args:87      key: (string) The items identifier.88      item: (dict) The item to insert or replace the existing item with.89                   A 'kind' attribute will be added if needed.90    Returns:91       A copy of the inserted item.92    """93    wax = self.AddWaxKind(item)94    self._mutex.acquire()95    session_item = self._FindItem(key)96    if session_item:97      self._items.remove(session_item)98    self._items.append(copy.deepcopy(wax))99    self._mutex.release()100    return wax101  def PatchItem(self, key, item):102    """Updates an existing item with the given id with the elements of item.103    This overwrites the existing data with the new data. Any old values that104    were not contained in item are preserved.105    Args:106      key: (string) The identifier to update107      item: (dict) The particular values to update.108    Returns:109      A copy of the patched item or None on falure.110    """111    wax = None112    self._mutex.acquire()113    session_item = self._FindItem(key)114    if session_item:115      self._items.remove(session_item)116      wax = dict(session_item.items() + item.items())117      self._items.append(copy.deepcopy(wax))118    self._mutex.release()119    return wax120  def DeleteItem(self, key):121    """Deletes an item from the session.122    Args:123      key: (string) The identifier of the item to delete.124    Returns:125      The deleted item or None.126    """127    self._mutex.acquire()128    session_item = self._FindItem(key)129    if session_item:130      self._items.remove(session_item)131    self._mutex.release()132    return session_item133  def GetItemCopy(self, key):134    """Get a copy of the item with the given identifier.135    This returns a copy for simple thread-safety.136    Args:137      key: (string) The identifier of the item to return138    Returns:139      None if the identifier isnt present. Otherwise a copy of the item.140    """141    result = None142    self._mutex.acquire()143    session_item = self._FindItem(key)144    if session_item:145      result = copy.deepcopy(session_item)146    self._mutex.release()147    return result148  def GetAllItemsCopy(self):149    """Returns a copy of the list of all items.150    Returns:151      A copy is returned for simple thread-safety.152    """153    self._mutex.acquire()154    result = copy.deepcopy(self._items)155    self._mutex.release()156    return result157  def AddWaxKind(self, item):158    """Helper function that adds the 'kind' attribute to an item.159    Args:160      item: (dict)  The item to modify.161    Returns:162      A reference to the item with a 'kind' attribute added.163    """164    item['kind'] = 'wax#waxDataItem'165    return item166class Repository(object):167  """A repository of sessions keyed by sessionId.168     The repository is thread-safe.169  """170  def __init__(self):171    """Initializes an empty repository."""172    self._sessions = {}173    self._nodeid = str(time.clock())174    self._sequence_num = 0175    self._mutex = threading.Lock()176  def GetSessionData(self, key):177    """Returns a reference to the session with the given identifier.178    Args:179      key: (string) The session identifier.180    Returns:181      A reference to the SessionData or None.182    """183    self._mutex.acquire()184    session_data = self._sessions.get(key)185    self._mutex.release()186    return session_data187  def RemoveIdentifier(self, key):188    """Removes a session identifier and its data.189    Args:190      key: (string) The session identifier.191    Returns:192      The session data removed or None193    """194    self._mutex.acquire()195    session_data = self._sessions.pop(key, None)196    self._mutex.release()197    return session_data198  def NewIdentifier(self, basename):199    """Adds a new empty session and gives it a new unique identifier.200    Args:201       basename: (string) The prefix for the generated identifier.202    Returns:203       The generated identifier to refer to the SessionData in the future.204    """205    self._mutex.acquire()206    self._sequence_num += 1207    key = '%s-%s-%s' % (basename, self._nodeid, self._sequence_num)208    self._sessions[key] = SessionData()209    self._mutex.release()210    return key211REPOSITORY_ = Repository()212class WaxHandler(BaseHTTPRequestHandler):213  """Implements a Wax Service interface for the web server."""214  def _SendResponse(self, payload, http_code, content_type='text/plain'):215    """Send HTTP response.216    Args:217      payload: (string) The HTTP response body.218      http_code: (int) The HTTP response status code.219      content_type: (string) The HTTP Content-Type header.220    Returns:221      The http_code.222    """223    self.send_response(http_code)224    self.send_header('Content-Type', content_type)225    self.end_headers()226    self.wfile.write(payload)227    return http_code228  def _SendJsonObjectResponse(self, obj, http_code):229    """Sends object as a JSON-encoded HTTP response.230    Args:231      obj: (dict) The object to return232      http_code: (int) The HTTP response status code.233    Returns:234      The http_code.235    """236    return self._SendResponse(237        json.dumps(obj), http_code, content_type=_JSON_CONTENT_TYPE)238  def _SendJsonErrorResponse(self, msg, http_code):239    """Sends a JSON a JSON-encoded HTTP response containing the message.240    Args:241      msg: (sting) The message to send (explaining error).242      http_code: (int) The HTTP response status code.243    Returns:244      The http_code.245    """246    return self._SendResponse(247        json.dumps({'message': msg, 'code': http_code}),248        http_code,249        content_type=_JSON_CONTENT_TYPE)250  def _ProcessNewSessionCommand(self):251    """Adds a new session and sends a response.252    Responds with JSON containing a newSessionId attribute.253    Returns:254      The final HTTP status code.255    """256    request_json = self._GetJson()257    if not request_json:258      return self._SendJsonErrorResponse('Invalid JSON', 400)259    key = REPOSITORY_.NewIdentifier(request_json['sessionName'])260    return self._SendJsonObjectResponse(261        {'kind': 'wax#waxNewSession', 'newSessionId': key}, 200)262  def _ProcessRemoveSessionCommand(self):263    """Removes the session and sends a response.264    Responds with JSON containing a removeSessionId attribute.265    Returns:266      The final HTTP status code.267    """268    request_json = self._GetJson()269    if not request_json:270      return self._SendJsonErrorResponse('Invalid JSON', 400)271    key = request_json['sessionId']272    old_item = REPOSITORY_.RemoveIdentifier(key)273    if not old_item:274      return self._SendJsonErrorResponse('Unknown sessionId', 404)275    return self._SendJsonObjectResponse(276        {'kind': 'wax#waxRemoveSession', 'removeSessionId': key}, 200)277  def _ProcessGetSessionItemsCommand(self, session_data):278    """Sends a JSON response with the session items.279    Args:280      session_data: (SessionData) The SessionData instance281    Returns:282      The final HTTP status code.283    """284    items_copy = session_data.GetAllItemsCopy()285    return self._SendJsonObjectResponse(286        {'kind': 'wax#waxList', 'items': items_copy}, 200)287  def _ProcessInsertNewItemCommand(self, session_data):288    """Adds the item specified by the JSON payload and send a JSON response.289    Args:290      session_data: (SessionData) The SessionData instance291    Returns:292      The final HTTP status code.293    """294    request_json = self._GetJson()295    # TODO(user): 20130624296    # Distinguish {} from no payload.297    if not request_json:298      return self._SendJsonErrorResponse('Invalid JSON', 400)299    item_id = request_json.get('id', None)300    if not item_id:301      return self._SendJsonErrorResponse('JSON missing id', 400)302    added = session_data.AddNewItem(item_id, request_json)303    if added:304      time.sleep(3.0 / 1000.0)  # delay 3 ms so we can test timeouts305      return self._SendJsonObjectResponse(added, 200)306    else:307      return self._SendJsonErrorResponse('Item already exists', 403)308  def _ProcessGetItemCommand(self, session_data, item_id):309    """Returns a JSON response with the specified item.310    Args:311      session_data: (SessionData) The SessionData instance312      item_id: (string) The item id to return313    Returns:314      The final HTTP status code.315    """316    item_copy = session_data.GetItemCopy(item_id)317    if item_copy:318      return self._SendJsonObjectResponse(item_copy, 200)319    return self._SendJsonErrorResponse('Unknown item', 404)320  def _ProcessDeleteItemCommand(self, session_data, item_id):321    """Deletes the specified item and sends a response.322    Args:323      session_data: (SessionData) The SessionData instance.324      item_id: (string) The item id to return.325    Returns:326      The final HTTP status code.327    """328    if session_data.DeleteItem(item_id):329      return self._SendResponse('', 204)330    else:331      return self._SendJsonErrorResponse('Unknown item in session', 404)332  def _ProcessPatchItemCommand(self, session_data, item_id, request_json):333    """Patches the specified item with the JSON payload and responds.334    Args:335      session_data: (SessionData) The SessionData instance.336      item_id: (string) The item id to return.337      request_json: (dict) The value used to patch338    Returns:339      The final HTTP status code.340    """341    patched_item = session_data.PatchItem(item_id, request_json)342    if patched_item:343      return self._SendJsonObjectResponse(patched_item, 200)344    else:345      return self._SendJsonErrorResponse('Unknown item in session', 404)346  def _ProcessUpdateItemCommand(self, session_data, item_id, request_json):347    """Replaces the specified item with the JSON payload and responds.348    Args:349      session_data: (SessionData) The SessionData instance.350      item_id: (string) The item id to return.351      request_json: (dict) The replacement value.352    Returns:353      The final HTTP status code.354    """355    request_json.setdefault('id', item_id)356    if request_json['id'] != item_id:357      return self._SendJsonErrorResponse('Mismatched item ids', 400)358    wax = session_data.ReplaceItem(item_id, request_json)359    return self._SendJsonObjectResponse(wax, 200)360  def _ReadChunkedPayload(self):361    """Reads a payload sent with a chunked Transfer-Encoding.362    Raises:363      ValueError: if the payload was not properly chunk encoded.364    Returns:365      The decoded payload string.366    """367    payload = []368    while True:369      chunk_len = 0370      while True:371        hex_digit = self.rfile.read(1)372        if hex_digit in string.hexdigits:373          chunk_len = 16 * chunk_len + int(hex_digit, 16)374        elif hex_digit != '\r' or self.rfile.read(1) != '\n':375          raise ValueError('Expected \\r\\n termination')376        else:377          break378      if chunk_len == 0:379        break380      payload.append(self.rfile.read(chunk_len))381    return ''.join(payload)382  def _GetJson(self):383    """Reads JSON object from payload.384    Returns:385      JSON decoded object386    """387    encoding = self.headers.getheader('Transfer-Encoding')388    if encoding == 'chunked':389      payload = self._ReadChunkedPayload()390    else:391      length = int(self.headers.getheader('Content-Length'))392      payload = self.rfile.read(length)393      if (not payload394          or self.headers.getheader('Content-Type') != _JSON_CONTENT_TYPE):395        return None396    json_item = json.loads(payload)397    if not json_item:398      return None399    return json_item400  def _DispatchMethod(self, method):401    """Executes WAX method and sends response.402    Args:403      method: (string) The HTTP method type received.404    Returns:405      The response HTTP status code sent.406    """407    if self.path == '/quit':408      _SERVER_SETTINGS['run'] = False409      return self._SendResponse('BYE', 200)410    if self.path == '/newsession' and method == 'POST':411      return self._ProcessNewSessionCommand()412    if self.path == '/removesession' and method == 'POST':413      return self._ProcessRemoveSessionCommand()414    match = _RE_ITEM_COMMAND.match(self.path)415    if match:416      self._HandleItemMethod(method, match.group(1), match.group(2))417      return418    match = _RE_SESSION_COMMAND.match(self.path)419    if match:420      self._HandleSessionMethod(method, match.group(1))421      return422    self._SendJsonErrorResponse('URL Not Available', 404)423  def _HandleSessionMethod(self, method, session_id):424    """Executes method on wax sessions resource and sends response.425    Args:426      method: (string) HTTP method type.427      session_id: (string) Wax session identifier.428    Returns:429      HTTP status code.430    """431    session_data = REPOSITORY_.GetSessionData(session_id)432    if not session_data:433      self._SendJsonErrorResponse('Unknown sessionId', 404)434      return435    if method == 'GET':436      return self._ProcessGetSessionItemsCommand(session_data)437    if method == 'POST':438      return self._ProcessInsertNewItemCommand(session_data)439    return self._SendJsonErrorResponse('Unhandled method', 405)440  def _HandleItemMethod(self, method, session_id, item_id):441    """Executes method on wax items resource and sends response.442    Args:443      method: (string) HTTP method type.444      session_id: (string) Wax session identifier.445      item_id: (string) Wax item identifier.446    Returns:447      HTTP status code.448    """449    session_data = REPOSITORY_.GetSessionData(session_id)450    if not session_data:451      return self._SendJsonErrorResponse('Unknown sessionId', 404)452    if method == 'GET':453      return self._ProcessGetItemCommand(session_data, item_id)454    if method == 'DELETE':455      return self._ProcessDeleteItemCommand(session_data, item_id)456    # TODO(user): 20130624457    # Distinguish {} from no payload.458    request_json = self._GetJson()459    if not request_json:460      return self._SendJsonErrorResponse('Invalid JSON', 400)461    if method == 'PATCH':462      return self._ProcessPatchItemCommand(session_data, item_id, request_json)463    if method == 'PUT':464      return self._ProcessUpdateItemCommand(session_data, item_id, request_json)465    return self._SendJsonErrorResponse('Unhandled method', 405)466  def do_GET(self):     # pylint: disable=g-bad-name467    self._DispatchMethod('GET')468  def do_DELETE(self):  # pylint: disable=g-bad-name469    self._DispatchMethod('DELETE')470  def do_PATCH(self):   # pylint: disable=g-bad-name471    self._DispatchMethod('PATCH')472  def do_POST(self):    # pylint: disable=g-bad-name473    self._DispatchMethod('POST')474  def do_PUT(self):     # pylint: disable=g-bad-name475    self._DispatchMethod('PUT')476def main(argv):477  """Runs the program."""478  try:479    opts = getopt.getopt(sys.argv[1:], '-g', ['port=', 'signal_pid='])[0]480  except getopt.GetoptError:481    print '%s: [-g] [--port=<port>] [--signal_pid=<pid>]' % argv[0]482    sys.exit(1)483  signal_pid = 0484  host = '127.0.0.0'485  port = 5000486  for key, value in opts:487    if key == '--port':488      port = int(value)489      print 'Running on port: %d' % port490    elif key == '-g':491      host = '0.0.0.0'492    elif key == '--signal_pid':493      signal_pid = int(value)494  if host != '0.0.0.0':495    print 'Only available on localhost (-g not provided)'496  server = HTTPServer((host, port), WaxHandler)497  print 'Started WaxServer on %s:%s' % (host, port)498  if signal_pid != 0:499    print 'Server ready -- signaling %d' % signal_pid500    os.kill(signal_pid, signal.SIGUSR1)501  while _SERVER_SETTINGS['run']:502    server.handle_request()503if __name__ == '__main__':...syn
Source:syn  
...79        time.sleep(1)80        list_processes()81    # Very important to exit quickly, otherwise a ^C sent to us will also kill our child82    exit(0)83def signal_pid(pid, sig, english):84    account = account_from_pid(pid)85    if account is None:86        print("No such PID")87    else:88        os.kill(pid, sig)89        print(english,account)90        open(HISTORY_FILE,"at").write(time.ctime() + " " + english + " " + account + "\n")91        time.sleep(1)92        list_processes()93# We don't use RX, for now94g_connected_account = None95def command_response(packet):96    # Is it for us?97    if g_connected_account is None:98        return99    if not "headers" in packet:100        return101    if not "Instancename" in packet["headers"]:102        return103    if packet["headers"]["Instancename"] != g_connected_account:104        return105    if not "action" in packet:106        return107    if packet["action"] != "response":108        return109    print(packet["response"] + "\n> ",end="",flush=True)110def connect(pid):111    global g_connected_account112    zeromq_tx.init(emit_logging=False)  # Don't clutter-up screen with logging113    # zeromq_rx.init(command_response, emit_logging=False)114    g_connected_account = account_from_pid(pid)115    if g_connected_account is None:116        print("No such PID")117        return118    print("Connected to",g_connected_account, "- type 'exit'<return> to disconnect")119    zeromq_tx.socket_send( { "action" : "none" } )  # First packet seems to be ignored120    while True:121        inp = input("> ")122        if inp in ["exit","quit","disconnect","stop","leave"]:123            break124        inp = inp.strip()125        if len(inp) > 0:126            inp = inp.split(" ")127            packet = { "headers" : { "Instancename" : g_connected_account }, "action" : "command", "argv" : inp }128            zeromq_tx.socket_send(packet)129    print("Disconnected")130if __name__ == "__main__":131    if len(sys.argv) < 2:132        list_processes()133        print("\nFor available commands type '"+sys.argv[0]+" help'")134    else:135        if sys.argv[1] == "list":136            list_processes()137        elif sys.argv[1] == "watch":138            watch(int(sys.argv[2]))139        elif sys.argv[1] == "run":140            run(sys.argv[2], sys.argv[3])141        elif sys.argv[1] == "kill":142            signal_pid(int(sys.argv[2]), signal.SIGKILL, "Killed")143        elif sys.argv[1] == "pause":144            signal_pid(int(sys.argv[2]), signal.SIGUSR1, "Paused")145        elif sys.argv[1] == "unpause":146            signal_pid(int(sys.argv[2]), signal.SIGUSR2, "Unpaused")147        elif sys.argv[1] == "connect":148            connect(int(sys.argv[2]))149        elif sys.argv[1] == "history":150            print(open(HISTORY_FILE,"rt").read())151        elif sys.argv[1] == "help":152            print("Commands:")153            print("    list                        List all synth jobs, with their PID")154            print("    run <account> <scenario>    Run a synth job. <account> is one of ../synth_accounts and <scenario> is one of ./scenarios e.g. 'run OnFStest 10secs'")155            print("    watch <pid>                 Monitor the output from a synth job")156            print("    kill <pid>                  Kill a synth job")157            print("    pause <pid>                 Pause a synth job")158            print("    unpause <pid>               Unpause a synth job")159            print("    connect <pid>               Connect to a synth job and send commands")160            print("    history                     List all recent activities")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
