Best Python code snippet using autotest_python
4035abe4ee651b035771cc9a5bc307f41e33e0ddnode.py
Source:4035abe4ee651b035771cc9a5bc307f41e33e0ddnode.py  
1""" The Node class """2import zmq3import logging4import tempfile5from zerotask.server import Server6from zerotask.task import task7from zerotask import jsonrpc8from zerotask.exceptions import JSONRPCError9from zerotask.worker import Worker10from multiprocessing import Process11import optparse12class Node(Server):13    """ A simple example server """14    namespace = "zerotask.node"15    def __init__(self, **kwargs):16        self.node_id = kwargs.get("node_id", None)17        self.running_tasks = 018        self.workers = []19        self.worker_count = kwargs.get("workers", 1) # 1 worker by default20        Server.__init__(self)21    def setup(self):22        """ Sets up the handlers """23        self.broker_req_socket = None24        self.broker_sub_socket = None25        self._push_file = tempfile.NamedTemporaryFile(prefix="zerotaskq-")26        self._pull_file = tempfile.NamedTemporaryFile(prefix="zerotaskr-")27        self.push_socket = self.context.socket(zmq.PUSH)28        self.push_socket.bind("ipc://%s" % self._push_file.name)29        self.pull_socket = self.context.socket(zmq.PULL)30        self.pull_socket.bind("ipc://%s" % self._pull_file.name)31        self.add_callback(self.pull_socket, self.worker_task_result)32        self.add_handler(self.task_ready)33        self.add_handler(self.request_status)34    def teardown(self):35        """ Closes temp files """36        logging.info("Closing temporary files")37        self._push_file.close()38        self._pull_file.close()39    def start(self):40        """ Checks if broker is setup, then starts the loop """41        if not self.broker_req_socket or not self.broker_sub_socket:42            self.add_broker("tcp://127.0.0.1:5555", "tcp://127.0.0.1:5556")43        logging.info("Starting up %s workers...", self.worker_count)44        for i in range(self.worker_count):45            push_file = self._push_file.name46            pull_file = self._pull_file.name47            def worker_func():48                worker = Worker(queue=push_file, result=pull_file,49                                name="worker-%d" % i)50                worker.start()51            process = Process(target=worker_func)52            self.workers.append(process)53            process.start()54        Server.start(self)55    def add_broker(self, broker_req_uri, broker_sub_uri):56        """ Sets the (only) broker """57        self.broker_req_socket = self.context.socket(zmq.REQ)58        self.broker_sub_socket = self.context.socket(zmq.SUB)59        self.broker_req_socket.connect(broker_req_uri)60        self.broker_sub_socket.connect(broker_sub_uri)61        self.broker_sub_socket.setsockopt(zmq.SUBSCRIBE, "")62        # Getting node id from broker63        connect_method = "zerotask.broker.node_connect"64        connect_request = jsonrpc.request(connect_method)65        self.broker_req_socket.send_json(connect_request)66        connect_result = self.broker_req_socket.recv_json()67        if connect_result.has_key("error"):68            connect_error = connect_result["error"]69            raise JSONRPCError(connect_error["code"],70                               connect_error.get("message"))71        node_id = connect_result.get("result")72        if not node_id:73            raise JSONRPCError(jsonrpc.INVALID_NODE_ID)74        self.node_id = node_id75        logging.info("New node id: %s", node_id)76        self.add_callback(self.broker_sub_socket, self.subscribe)77    def subscribe(self, message):78        """ Special dispatching """79        logging.info("Received message %s", message)80        result = self.dispatcher.dispatch(message)81        if result:82            self.broker_req_socket.send_json(result)83            self.broker_req_socket.recv_json()84    def worker_task_result(self, result):85        """ Reports a task result back to broker """86        self.running_tasks -= 187        if not result:88            logging.warning("Why do we have an empty result??")89            return90        task_id = result.get("id")91        result_method = "zerotask.broker.node_task_finished"92        result_params = dict(node_id=self.node_id,93                             task_id=task_id)94        if result.has_key("error"):95            result_method = "zerotask.broker.node_task_failed"96            result_params["error"] = result["error"]97        else:98            result_params["result"] = result["result"]99        result_req = jsonrpc.request(method=result_method,100                                     params=result_params,101                                     id=None) # a notification102        self.broker_req_socket.send_json(result_req)103        self.broker_req_socket.recv_json()104    # Tasks...105    def task_ready(self, method, task_id):106        """ Checks if node support the method, and responds if so. """107        if not self.dispatcher.has_handler(method):108            logging.info("Ignoring -- method %s is not supported.", method)109            return None # we don't support that method110        logging.info("Responding for new task %s", method)111        req_params = dict(node_id=self.node_id, task_id=task_id)112        req_method = "zerotask.broker.node_task_request"113        request = jsonrpc.request(req_method, req_params)114        self.broker_req_socket.send_json(request)115        result = self.broker_req_socket.recv_json()116        if result.has_key("error"):117            raise JSONRPCError(result["error"]["code"],118                               result["error"].get("message"))119        task_result = result.get("result")120        if not task_result:121            return None # we were not elected122        method = task_result['method']123        params = task_result.get("params", [])124        request = jsonrpc.request(method, params, id=task_id)125        logging.info("Assigning new request: %s", request)126        self.push_socket.send_json(request)127        self.running_tasks += 1128    def request_status(self):129        """ Calls broker with the current node status """130        notify_method = "zerotask.broker.node_status"131        notify_params = dict(node_id=self.node_id,132                             workers=1)133        notification = jsonrpc.request(method=notify_method,134                                       params=notify_params,135                                       id=None) # notification136        logging.info("Sending status message: %s" % notification)137        self.broker_req_socket.send_json(notification)138        self.broker_req_socket.recv_json()139        return None140def main():141    """ Setup up basic node with add / subtract methods. """142    options = optparse.OptionParser()143    options.add_option("-w", "--workers", dest="workers", type="int",144                       default=1, help="the number of worker procs")145    options.add_option("-r", "--request_port", dest="req_port", type="int",146                       default=5555, help="the broker's request port")147    options.add_option("-s", "--subscribe_port", dest="sub_port", type="int",148                       default=5556, help="the broker's subscribe port")149    options.add_option("-a", "--address", dest="address",150                       default="*", help="the broker's address")151    options.add_option("-l", "--loglevel", dest="loglevel",152                       default="WARNING", help="INFO|WARNING|ERROR")153    opts, args = options.parse_args()154    logging.getLogger().setLevel(getattr(logging, opts.loglevel))155    logging.info("Starting with broker request port %d", opts.req_port)156    logging.info("Starting with broker subscribe port %d", opts.sub_port)157    @task158    def add(first, second):159        """ just an addition method test """160        return first + second161    @task(name="subtract")162    def subtract_method(first, second):163        """ Testing task name attribute """164        return first - second165    node = Node(workers=opts.workers)166    broker_req_uri = "tcp://%s:%s" % (opts.address, opts.req_port)167    broker_sub_uri = "tcp://%s:%s" % (opts.address, opts.sub_port)168    node.add_broker(broker_req_uri, broker_sub_uri)169    node.start()170if __name__ == "__main__":...lxd.py
Source:lxd.py  
...37        self._server = server38    def _push_file(self, src, dst):39        check_call(['lxc', 'file', 'push',40                    src, '{}/{}'.format(self._container_name, dst)])41    def _pull_file(self, src, dst):42        check_call(['lxc', 'file', 'pull',43                    '{}/{}'.format(self._container_name, src), dst])44    def _container_run(self, cmd):45        check_call(['lxc', 'exec', self._container_name, '--'] + cmd)46    @contextmanager47    def _create_container(self):48        try:49            remote_tmp = petname.Generate(2, '-')50            check_call(['lxc', 'remote', 'add', remote_tmp, self._server])51            check_call([52                'lxc', 'launch', '-e',53                '{}:ubuntu/xenial/{}'.format(54                    remote_tmp, self._project_options.deb_arch),55                self._container_name])56            yield57        finally:58            # Stopping takes a while and lxc doesn't print anything.59            print('Stopping {}'.format(self._container_name))60            check_call(['lxc', 'stop', '-f', self._container_name])61            check_call(['lxc', 'remote', 'remove', remote_tmp])62    def execute(self):63        with self._create_container():64            self._setup_project()65            self._wait_for_network()66            self._container_run(['apt-get', 'update'])67            self._container_run(['apt-get', 'install', 'snapcraft', '-y'])68            try:69                self._container_run(70                    ['snapcraft', 'snap', '--output', self._snap_output])71            except CalledProcessError as e:72                if self._project_options.debug:73                    logger.info('Debug mode enabled, dropping into a shell')74                    self._container_run(['bash', '-i'])75                else:76                    raise e77            else:78                self._pull_snap()79    def _setup_project(self):80        logger.info('Setting up container with project assets')81        dst = os.path.join('/root', os.path.basename(self._tar_filename))82        self._push_file(self._tar_filename, dst)83        self._container_run(['tar', 'xvf', dst])84    def _pull_snap(self):85        src = os.path.join('/root', self._snap_output)86        self._pull_file(src, self._snap_output)87        logger.info('Retrieved {}'.format(self._snap_output))88    def _wait_for_network(self):89        logger.info('Waiting for a network connection...')90        not_connected = True91        retry_count = 592        while not_connected:93            sleep(5)94            try:95                self._container_run(['python3', '-c', _NETWORK_PROBE_COMMAND])96                not_connected = False97            except CalledProcessError as e:98                retry_count -= 199                if retry_count == 0:100                    raise e...archive_requests.py
Source:archive_requests.py  
...26        if retry is None:27            retry = self.default_retry_count28        while retry:29            try:30                self._pull_file(archive_filename, cart_filepath, hashval, hashtype)31                retry = 032            except (requests.exceptions.RequestException, ValueError) as ex:33                if retry == 1:34                    raise ex35                sleep(self.default_retry_sleep)36                retry -= 137    # pylint: enable=too-many-arguments38    def _pull_file(self, archive_filename, cart_filepath, hashval, hashtype):39        xfer_size = parse_size(get_config().get('cartd', 'transfer_size'))40        resp = requests.get(str(self._url + archive_filename), stream=True)41        if int(resp.status_code/100) == 5:42            raise requests.exceptions.RequestException('Status code is 500')43        myfile = open(cart_filepath, 'wb+')44        buf = resp.raw.read(xfer_size)45        myhash = hashlib.new(hashtype)46        while buf:47            myfile.write(buf)48            myhash.update(buf)49            buf = resp.raw.read(xfer_size)50        myfile.close()51        myhashval = myhash.hexdigest()52        if myhashval != hashval:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
