Best Python code snippet using hypothesis
processpool.py
Source:processpool.py  
...249        self._start_if_needed()250        if extra_args is None:251            extra_args = {}252        self._validate_all_known_args(extra_args)253        transfer_id = self._transfer_monitor.notify_new_transfer()254        download_file_request = DownloadFileRequest(255                transfer_id=transfer_id, bucket=bucket, key=key,256                filename=filename, extra_args=extra_args,257                expected_size=expected_size,258            )259        logger.debug(260            'Submitting download file request: %s.', download_file_request)261        self._download_request_queue.put(download_file_request)262        call_args = CallArgs(263            bucket=bucket, key=key, filename=filename, extra_args=extra_args,264            expected_size=expected_size)265        future = self._get_transfer_future(transfer_id, call_args)266        return future267    def shutdown(self):268        """Shutdown the downloader269        It will wait till all downloads are complete before returning.270        """271        self._shutdown_if_needed()272    def __enter__(self):273        return self274    def __exit__(self, exc_type, exc_value, *args):275        if isinstance(exc_value, KeyboardInterrupt):276            if self._transfer_monitor is not None:277                self._transfer_monitor.notify_cancel_all_in_progress()278        self.shutdown()279    def _start_if_needed(self):280        with self._start_lock:281            if not self._started:282                self._start()283    def _start(self):284        self._start_transfer_monitor_manager()285        self._start_submitter()286        self._start_get_object_workers()287        self._started = True288    def _validate_all_known_args(self, provided):289        for kwarg in provided:290            if kwarg not in ALLOWED_DOWNLOAD_ARGS:291                raise ValueError(292                    "Invalid extra_args key '%s', "293                    "must be one of: %s" % (294                        kwarg, ', '.join(ALLOWED_DOWNLOAD_ARGS)))295    def _get_transfer_future(self, transfer_id, call_args):296        meta = ProcessPoolTransferMeta(297            call_args=call_args, transfer_id=transfer_id)298        future = ProcessPoolTransferFuture(299            monitor=self._transfer_monitor, meta=meta)300        return future301    def _start_transfer_monitor_manager(self):302        logger.debug('Starting the TransferMonitorManager.')303        self._manager = TransferMonitorManager()304        # We do not want Ctrl-C's to cause the manager to shutdown immediately305        # as worker processes will still need to communicate with it when they306        # are shutting down. So instead we ignore Ctrl-C and let the manager307        # be explicitly shutdown when shutting down the downloader.308        self._manager.start(_add_ignore_handler_for_interrupts)309        self._transfer_monitor = self._manager.TransferMonitor()310    def _start_submitter(self):311        logger.debug('Starting the GetObjectSubmitter.')312        self._submitter = GetObjectSubmitter(313            transfer_config=self._transfer_config,314            client_factory=self._client_factory,315            transfer_monitor=self._transfer_monitor,316            osutil=self._osutil,317            download_request_queue=self._download_request_queue,318            worker_queue=self._worker_queue319        )320        self._submitter.start()321    def _start_get_object_workers(self):322        logger.debug('Starting %s GetObjectWorkers.',323                     self._transfer_config.max_request_processes)324        for _ in range(self._transfer_config.max_request_processes):325            worker = GetObjectWorker(326                queue=self._worker_queue,327                client_factory=self._client_factory,328                transfer_monitor=self._transfer_monitor,329                osutil=self._osutil,330            )331            worker.start()332            self._workers.append(worker)333    def _shutdown_if_needed(self):334        with self._start_lock:335            if self._started:336                self._shutdown()337    def _shutdown(self):338        self._shutdown_submitter()339        self._shutdown_get_object_workers()340        self._shutdown_transfer_monitor_manager()341        self._started = False342    def _shutdown_transfer_monitor_manager(self):343        logger.debug('Shutting down the TransferMonitorManager.')344        self._manager.shutdown()345    def _shutdown_submitter(self):346        logger.debug('Shutting down the GetObjectSubmitter.')347        self._download_request_queue.put(SHUTDOWN_SIGNAL)348        self._submitter.join()349    def _shutdown_get_object_workers(self):350        logger.debug('Shutting down the GetObjectWorkers.')351        for _ in self._workers:352            self._worker_queue.put(SHUTDOWN_SIGNAL)353        for worker in self._workers:354            worker.join()355class ProcessPoolTransferFuture(BaseTransferFuture):356    def __init__(self, monitor, meta):357        """The future associated to a submitted process pool transfer request358        :type monitor: TransferMonitor359        :param monitor: The monitor associated to the proccess pool downloader360        :type meta: ProcessPoolTransferMeta361        :param meta: The metadata associated to the request. This object362            is visible to the requester.363        """364        self._monitor = monitor365        self._meta = meta366    @property367    def meta(self):368        return self._meta369    def done(self):370        return self._monitor.is_done(self._meta.transfer_id)371    def result(self):372        try:373            return self._monitor.poll_for_result(self._meta.transfer_id)374        except KeyboardInterrupt:375            # For the multiprocessing Manager, a thread is given a single376            # connection to reuse in communicating between the thread in the377            # main process and the Manager's process. If a Ctrl-C happens when378            # polling for the result, it will make the main thread stop trying379            # to receive from the connection, but the Manager process will not380            # know that the main process has stopped trying to receive and381            # will not close the connection. As a result if another message is382            # sent to the Manager process, the listener in the Manager383            # processes will not process the new message as it is still trying384            # trying to process the previous message (that was Ctrl-C'd) and385            # thus cause the thread in the main process to hang on its send.386            # The only way around this is to create a new connection and send387            # messages from that new connection instead.388            self._monitor._connect()389            self.cancel()390            raise391    def cancel(self):392        self._monitor.notify_exception(393            self._meta.transfer_id, CancelledError()394        )395class ProcessPoolTransferMeta(BaseTransferMeta):396    """Holds metadata about the ProcessPoolTransferFuture"""397    def __init__(self, transfer_id, call_args):398        self._transfer_id = transfer_id399        self._call_args = call_args400        self._user_context = {}401    @property402    def call_args(self):403        return self._call_args404    @property405    def transfer_id(self):406        return self._transfer_id407    @property408    def user_context(self):409        return self._user_context410class ClientFactory(object):411    def __init__(self, client_kwargs=None):412        """Creates S3 clients for processes413        Botocore sessions and clients are not pickleable so they cannot be414        inherited across Process boundaries. Instead, they must be instantiated415        once a process is running.416        """417        self._client_kwargs = client_kwargs418        if self._client_kwargs is None:419            self._client_kwargs = {}420        client_config = deepcopy(self._client_kwargs.get('config', Config()))421        if not client_config.user_agent_extra:422            client_config.user_agent_extra = PROCESS_USER_AGENT423        else:424            client_config.user_agent_extra += " " + PROCESS_USER_AGENT425        self._client_kwargs['config'] = client_config426    def create_client(self):427        """Create a botocore S3 client"""428        return botocore.session.Session().create_client(429            's3', **self._client_kwargs)430class TransferMonitor(object):431    def __init__(self):432        """Monitors transfers for cross-proccess communication433        Notifications can be sent to the monitor and information can be434        retrieved from the monitor for a particular transfer. This abstraction435        is ran in a ``multiprocessing.managers.BaseManager`` in order to be436        shared across processes.437        """438        # TODO: Add logic that removes the TransferState if the transfer is439        #  marked as done and the reference to the future is no longer being440        #  held onto. Without this logic, this dictionary will continue to441        #  grow in size with no limit.442        self._transfer_states = {}443        self._id_count = 0444        self._init_lock = threading.Lock()445    def notify_new_transfer(self):446        with self._init_lock:447            transfer_id = self._id_count448            self._transfer_states[transfer_id] = TransferState()449            self._id_count += 1450            return transfer_id451    def is_done(self, transfer_id):452        """Determine a particular transfer is complete453        :param transfer_id: Unique identifier for the transfer454        :return: True, if done. False, otherwise.455        """456        return self._transfer_states[transfer_id].done457    def notify_done(self, transfer_id):458        """Notify a particular transfer is complete459        :param transfer_id: Unique identifier for the transfer...volume_transfer.py
Source:volume_transfer.py  
...24from cinder.i18n import _, _LI25from cinder import transfer as transferAPI26from cinder import utils27LOG = logging.getLogger(__name__)28def make_transfer(elem):29    elem.set('id')30    elem.set('volume_id')31    elem.set('created_at')32    elem.set('name')33    elem.set('auth_key')34class TransferTemplate(xmlutil.TemplateBuilder):35    def construct(self):36        root = xmlutil.TemplateElement('transfer', selector='transfer')37        make_transfer(root)38        alias = Volume_transfer.alias39        namespace = Volume_transfer.namespace40        return xmlutil.MasterTemplate(root, 1, nsmap={alias: namespace})41class TransfersTemplate(xmlutil.TemplateBuilder):42    def construct(self):43        root = xmlutil.TemplateElement('transfers')44        elem = xmlutil.SubTemplateElement(root, 'transfer',45                                          selector='transfers')46        make_transfer(elem)47        alias = Volume_transfer.alias48        namespace = Volume_transfer.namespace49        return xmlutil.MasterTemplate(root, 1, nsmap={alias: namespace})50class CreateDeserializer(wsgi.MetadataXMLDeserializer):51    def default(self, string):52        dom = utils.safe_minidom_parse_string(string)53        transfer = self._extract_transfer(dom)54        return {'body': {'transfer': transfer}}55    def _extract_transfer(self, node):56        transfer = {}57        transfer_node = self.find_first_child_named(node, 'transfer')58        attributes = ['volume_id', 'name']59        for attr in attributes:60            if transfer_node.getAttribute(attr):61                transfer[attr] = transfer_node.getAttribute(attr)62        return transfer63class AcceptDeserializer(wsgi.MetadataXMLDeserializer):64    def default(self, string):65        dom = utils.safe_minidom_parse_string(string)66        transfer = self._extract_transfer(dom)67        return {'body': {'accept': transfer}}68    def _extract_transfer(self, node):69        transfer = {}70        transfer_node = self.find_first_child_named(node, 'accept')71        attributes = ['auth_key']72        for attr in attributes:73            if transfer_node.getAttribute(attr):74                transfer[attr] = transfer_node.getAttribute(attr)75        return transfer76class VolumeTransferController(wsgi.Controller):77    """The Volume Transfer API controller for the OpenStack API."""78    _view_builder_class = transfer_view.ViewBuilder79    def __init__(self):80        self.transfer_api = transferAPI.API()81        super(VolumeTransferController, self).__init__()82    @wsgi.serializers(xml=TransferTemplate)83    def show(self, req, id):84        """Return data about active transfers."""85        context = req.environ['cinder.context']86        try:87            transfer = self.transfer_api.get(context, transfer_id=id)88        except exception.TransferNotFound as error:89            raise exc.HTTPNotFound(explanation=error.msg)90        return self._view_builder.detail(req, transfer)91    @wsgi.serializers(xml=TransfersTemplate)92    def index(self, req):93        """Returns a summary list of transfers."""94        return self._get_transfers(req, is_detail=False)95    @wsgi.serializers(xml=TransfersTemplate)96    def detail(self, req):97        """Returns a detailed list of transfers."""98        return self._get_transfers(req, is_detail=True)99    def _get_transfers(self, req, is_detail):100        """Returns a list of transfers, transformed through view builder."""101        context = req.environ['cinder.context']102        filters = req.params.copy()103        LOG.debug('Listing volume transfers')104        transfers = self.transfer_api.get_all(context, filters=filters)105        transfer_count = len(transfers)106        limited_list = common.limited(transfers, req)107        if is_detail:108            transfers = self._view_builder.detail_list(req, limited_list,109                                                       transfer_count)110        else:111            transfers = self._view_builder.summary_list(req, limited_list,112                                                        transfer_count)113        return transfers114    @wsgi.response(202)115    @wsgi.serializers(xml=TransferTemplate)116    @wsgi.deserializers(xml=CreateDeserializer)117    def create(self, req, body):118        """Create a new volume transfer."""119        LOG.debug('Creating new volume transfer %s', body)120        if not self.is_valid_body(body, 'transfer'):121            raise exc.HTTPBadRequest()122        context = req.environ['cinder.context']123        try:124            transfer = body['transfer']125            volume_id = transfer['volume_id']126        except KeyError:127            msg = _("Incorrect request body format")128            raise exc.HTTPBadRequest(explanation=msg)129        name = transfer.get('name', None)130        LOG.info(_LI("Creating transfer of volume %s"),131                 volume_id,132                 context=context)133        try:134            new_transfer = self.transfer_api.create(context, volume_id, name)135        except exception.InvalidVolume as error:136            raise exc.HTTPBadRequest(explanation=error.msg)137        except exception.VolumeNotFound as error:138            raise exc.HTTPNotFound(explanation=error.msg)139        transfer = self._view_builder.create(req,140                                             dict(new_transfer.iteritems()))141        return transfer142    @wsgi.response(202)143    @wsgi.serializers(xml=TransferTemplate)144    @wsgi.deserializers(xml=AcceptDeserializer)145    def accept(self, req, id, body):146        """Accept a new volume transfer."""147        transfer_id = id148        LOG.debug('Accepting volume transfer %s', transfer_id)149        if not self.is_valid_body(body, 'accept'):150            raise exc.HTTPBadRequest()151        context = req.environ['cinder.context']152        try:153            accept = body['accept']154            auth_key = accept['auth_key']155        except KeyError:156            msg = _("Incorrect request body format")157            raise exc.HTTPBadRequest(explanation=msg)158        LOG.info(_LI("Accepting transfer %s"), transfer_id,159                 context=context)160        try:161            accepted_transfer = self.transfer_api.accept(context, transfer_id,162                                                         auth_key)163        except exception.VolumeSizeExceedsAvailableQuota as error:164            raise exc.HTTPRequestEntityTooLarge(165                explanation=error.msg, headers={'Retry-After': 0})166        except exception.InvalidVolume as error:167            raise exc.HTTPBadRequest(explanation=error.msg)168        transfer = \169            self._view_builder.summary(req,170                                       dict(accepted_transfer.iteritems()))171        return transfer172    def delete(self, req, id):173        """Delete a transfer."""174        context = req.environ['cinder.context']175        LOG.info(_LI("Delete transfer with id: %s"), id, context=context)176        try:177            self.transfer_api.delete(context, transfer_id=id)178        except exception.TransferNotFound as error:179            raise exc.HTTPNotFound(explanation=error.msg)180        return webob.Response(status_int=202)181class Volume_transfer(extensions.ExtensionDescriptor):182    """Volume transfer management support."""183    name = "VolumeTransfer"184    alias = "os-volume-transfer"185    namespace = "http://docs.openstack.org/volume/ext/volume-transfer/" + \186                "api/v1.1"187    updated = "2013-05-29T00:00:00+00:00"188    def get_resources(self):189        resources = []190        res = extensions.ResourceExtension(Volume_transfer.alias,191                                           VolumeTransferController(),192                                           collection_actions={'detail':193                                                               'GET'},194                                           member_actions={'accept': 'POST'})195        resources.append(res)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
