Best Python code snippet using tempest_python
download_manager_test.py
Source:download_manager_test.py  
...103    self.fs.__exit__(None, None, None)104  def _write_info(self, path, info):105    content = json.dumps(info)106    self.fs.add_file(path, content)107  def _get_manager(108      self,109      register_checksums=True,110      url_infos=None,111      dl_dir='/dl_dir',112      extract_dir='/extract_dir',113      **kwargs114  ):115    manager = dm.DownloadManager(116        dataset_name='mnist',117        download_dir=dl_dir,118        extract_dir=extract_dir,119        manual_dir='/manual_dir',120        register_checksums=register_checksums,121        **kwargs122    )123    if url_infos:124      manager._url_infos = url_infos125    return manager126  def test_download(self):127    """One file in cache, one not."""128    a, b, c = [Artifact(i) for i in 'abc']129    # File `a` is cached130    self.fs.add_file(a.file_path)131    self.fs.add_file(a.file_path + '.INFO')132    # INFO file of c has been deleted:133    self.fs.add_file(c.file_path)134    self.dl_results[b.url] = b.url_info135    self.dl_results[c.url] = c.url_info136    manager = self._get_manager(url_infos={137        art.url: art.url_info for art in (a, b, c)138    })139    downloads = manager.download({140        'cached': a.url,141        'new': b.url,142        'info_deleted': c.url,143    })144    expected = {145        'cached': a.file_path,146        'new': b.file_path,147        'info_deleted': c.file_path,148    }149    self.assertEqual(downloads, expected)150    # A isn't downloaded as already cached151    # C is re-downloaded as incomplete152    self.assertCountEqual(self.downloaded_urls, {b.url, c.url})153    self.assertEqual(  # Downloaded size include cached downloads154        manager.downloaded_size, sum([art.url_info.size for art in (a, b, c)]))155  def test_extract(self):156    """One file already extracted, one file with NO_EXTRACT, one to extract."""157    cached = resource_lib.Resource(path='/dl_dir/cached', extract_method=ZIP)158    new_ = resource_lib.Resource(path='/dl_dir/new', extract_method=TAR)159    no_extract = resource_lib.Resource(path='/dl_dir/noextract',160                                       extract_method=NO_EXTRACT)161    self.fs.add_file('/extract_dir/ZIP.cached')162    self.extract_results['/dl_dir/new'] = '/extract_dir/TAR.new'163    manager = self._get_manager()164    res = manager.extract({165        'cached': cached,166        'new': new_,167        'noextract': no_extract,168    })169    expected = {170        'cached': '/extract_dir/ZIP.cached',171        'new': '/extract_dir/TAR.new',172        'noextract': '/dl_dir/noextract',173    }174    self.assertEqual(res, expected)175    self.assertCountEqual(self.extracted_paths, ['/dl_dir/new'])176  def test_extract_twice_parallel(self):177    # Make sure calling extract twice on same resource actually does the178    # extraction once.179    self.extract_results['/dl_dir/foo.tar'] = '/extract_dir/TAR.foo'180    manager = self._get_manager()181    out1 = manager.extract(['/dl_dir/foo.tar', '/dl_dir/foo.tar'])182    out2 = manager.extract('/dl_dir/foo.tar')183    self.assertEqual(out1, ['/extract_dir/TAR.foo', '/extract_dir/TAR.foo'])184    self.assertEqual(out2, '/extract_dir/TAR.foo')185    # Result is memoize so extract has only been called once186    self.assertCountEqual(self.extracted_paths, ['/dl_dir/foo.tar'])187  def test_download_and_extract(self):188    a, b = Artifact('a.zip'), Artifact('b')189    self.dl_results[a.url] = a.url_info190    self.dl_results[b.url] = b.url_info191    self.extract_results[a.file_path] = f'/extract_dir/ZIP.{a.file_name}'192    # url_b doesn't need any extraction.193    # Result is the same after caching:194    manager = self._get_manager(url_infos={195        a.url: a.url_info,196        b.url: b.url_info,197    })198    res = manager.download_and_extract({'a': a.url, 'b': b.url})199    self.assertEqual(res, {200        'a': '/extract_dir/ZIP.%s' % a.file_name,201        'b': b.file_path,202    })203  def test_download_and_extract_archive_ext_in_fname(self):204    # Make sure extraction method is properly deduced from original fname, and205    # not from URL.206    a = Artifact('a', url='http://a?key=1234')207    self.dl_results[a.url] = a.url_info208    self.dl_fnames[a.url] = 'abc.zip'209    self.extract_results[a.file_path] = f'/extract_dir/ZIP.{a.file_name}'210    manager = self._get_manager(url_infos={211        a.url: a.url_info,212    })213    res = manager.download_and_extract({'a': a.url})214    self.assertEqual(res, {215        'a': '/extract_dir/ZIP.%s' % a.file_name,216    })217  def test_download_and_extract_already_downloaded(self):218    a = Artifact('a')  # Extract can't be deduced from the url, but from .INFO219    # File was already downloaded:220    self.fs.add_file(a.file_path)221    self._write_info(a.file_path + '.INFO', {'original_fname': 'a.zip'})222    self.extract_results[a.file_path] = f'/extract_dir/ZIP.{a.file_name}'223    manager = self._get_manager(url_infos={224        a.url: a.url_info,225    })226    res = manager.download_and_extract(a.url)227    self.assertEqual(res, f'/extract_dir/ZIP.{a.file_name}')228    # No url downloaded, but file extracted.229    self.assertCountEqual(self.downloaded_urls, [])230    self.assertCountEqual(self.extracted_paths, [a.file_path])231  def test_force_download_and_extract(self):232    a = Artifact('a.tar.gz')233    self.dl_results[a.url] = a.url_info234    self.extract_results[a.file_path] = (235        f'/extract_dir/TAR_GZ.{a.file_name}')236    # Old content already exists237    self.fs.files = {238        a.file_path: 'old content',239        a.file_path + '.INFO': '{}',240        f'/extract_dir/TAR_GZ.{a.file_name}': 'old content',241    }242    # Redownloading the data overwrite the content243    manager = self._get_manager(244        force_download=True,245        force_extraction=True,246        url_infos={247            a.url: a.url_info,248        })249    res = manager.download_and_extract(a.url)250    self.assertEqual(res, f'/extract_dir/TAR_GZ.{a.file_name}')251    self.assertCountEqual(self.downloaded_urls, [a.url])252    self.assertCountEqual(self.extracted_paths, [a.file_path])253    self.assertNotEqual(a.file_path, 'old content')254    self.assertNotEqual(a.file_path + '.INFO', '{}')255    self.assertNotEqual(f'/extract_dir/TAR_GZ.{a.file_name}', 'old content')256  def test_wrong_checksum(self):257    a = Artifact('a.tar.gz')258    sha_b = _sha256('content of another file')259    self.dl_results[a.url] = a.url_info260    manager = self._get_manager(261        register_checksums=False,262        url_infos={263            a.url: checksums_lib.UrlInfo(size=a.url_info.size, checksum=sha_b),264        },265    )266    with self.assertRaises(dm.NonMatchingChecksumError):267      manager.download(a.url)268  def test_pickle(self):269    dl_manager = self._get_manager(register_checksums=False)270    pickle.loads(pickle.dumps(dl_manager))271    dl_manager = self._get_manager(register_checksums=True)272    with self.assertRaisesRegex(273        NotImplementedError, '`register_checksums` must be disabled'):274      pickle.dumps(dl_manager)275  def test_force_checksums_validation(self):276    """Tests for download manager with checksums."""277    dl_manager = self._get_manager(278        force_checksums_validation=True,279        register_checksums=False,280    )281    a = Artifact('x')282    self.dl_results[a.url] = a.url_info283    with self.assertRaisesRegex(ValueError, 'Missing checksums url'):284      dl_manager.download(a.url)285  def test_download_cached(self):286    """Tests that the URL is downloaded only once."""287    a = Artifact('x')288    self.dl_results[a.url] = a.url_info289    # Download the URL290    dl_manager = self._get_manager(291        register_checksums=False,292    )293    self.assertEqual(dl_manager.download(a.url), a.url_path)294    self.assertCountEqual(self.downloaded_urls, [a.url])295    self.assertCountEqual(self.fs.files, [a.url_path, a.url_path + '.INFO'])296    # Reuse downloaded cache297    dl_manager = self._get_manager(298        register_checksums=False,299    )300    self.assertEqual(dl_manager.download(a.url), a.url_path)301    self.assertCountEqual(self.downloaded_urls, [a.url])302    self.assertCountEqual(self.fs.files, [a.url_path, a.url_path + '.INFO'])303    # Reuse downloaded cache, even if url_info is present304    dl_manager = self._get_manager(305        register_checksums=False,306        url_infos={a.url: a.url_info},307    )308    self.assertEqual(dl_manager.download(a.url), a.url_path)309    self.assertCountEqual(self.downloaded_urls, [a.url])310    self.assertCountEqual(self.fs.files, [a.url_path, a.url_path + '.INFO'])311    # Reuse downloaded cache and register the checksums312    dl_manager = self._get_manager(313        register_checksums=True,  # <<< Register checksums !!!314    )315    self.assertEqual(dl_manager.download(a.url), a.file_path)316    self.assertCountEqual(self.downloaded_urls, [a.url])317    # The files have been renamed `url_path` -> `file_path`318    self.assertCountEqual(self.fs.files, [a.file_path, a.file_path + '.INFO'])319    # After checksums have been registered, `file_path` is used320    dl_manager = self._get_manager(321        register_checksums=False,322        url_infos={a.url: a.url_info},323    )324    self.assertEqual(dl_manager.download(a.url), a.file_path)325    self.assertCountEqual(self.downloaded_urls, [a.url])326    self.assertCountEqual(self.fs.files, [a.file_path, a.file_path + '.INFO'])327    # Registering checksums twice still reuse the cached `file_path`328    dl_manager = self._get_manager(329        register_checksums=True,  # <<< Re-register checksums...330        url_infos={a.url: a.url_info},  # ...but checksums already known331    )332    self.assertEqual(dl_manager.download(a.url), a.file_path)333    self.assertCountEqual(self.downloaded_urls, [a.url])  # Still one download334    self.assertCountEqual(self.fs.files, [a.file_path, a.file_path + '.INFO'])335    # Checksums unknown, so `file_path` unknown, re-downloading336    dl_manager = self._get_manager(337        register_checksums=False,338    )339    self.assertEqual(dl_manager.download(a.url), a.url_path)340    self.assertCountEqual(self.downloaded_urls, [a.url, a.url])  # Re-download!!341    self.assertCountEqual(self.fs.files, [342        a.url_path,343        a.url_path + '.INFO',344        a.file_path,  # `file_path` still exists from previous download345        a.file_path + '.INFO',346    ])347  def test_download_cached_checksums_error(self):348    """Tests that the download is cached, even if record_checksums fails."""349    a = Artifact('x')350    self.dl_results[a.url] = a.url_info351    class StoreChecksumsError(Exception):352      pass353    dl_manager = self._get_manager(354        register_checksums=True,355    )356    with absltest.mock.patch.object(357        checksums_lib, 'store_checksums', side_effect=StoreChecksumsError()):358      with self.assertRaises(StoreChecksumsError):359        dl_manager.download(a.url)360    # Even after failure, the file was properly downloaded361    self.assertCountEqual(self.downloaded_urls, [a.url])362    self.assertCountEqual(self.fs.files, [a.url_path, a.url_path + '.INFO'])363    # When the user retry, it should suceed without redownloading the file364    dl_manager = self._get_manager(365        register_checksums=True,366    )367    self.assertEqual(dl_manager.download(a.url), a.file_path)368    self.assertCountEqual(self.downloaded_urls, [a.url])369    # The files have been renamed `url_path` -> `file_path`370    self.assertCountEqual(self.fs.files, [a.file_path, a.file_path + '.INFO'])371  def test_download_url_info_in_info_file_missmatch(self):372    """Tests failure when downloaded checksums and `.INFO` mismatch."""373    a = Artifact('x')374    self.dl_results[a.url] = a.url_info375    # Download the url once376    dl_manager = self._get_manager(register_checksums=False)377    dl_manager.download(a.url)378    # The second time, download the url with a different checksum379    self.dl_results[a.url] = checksums_lib.UrlInfo(380        size=a.url_info.size,381        checksum=_sha256('Other content'),382    )383    dl_manager = self._get_manager(384        register_checksums=False,385        force_download=True,386    )387    with self.assertRaisesRegexp(ValueError, 'contains a different checksum'):388      dl_manager.download(a.url)389    # If the url is re-downloaded with the same hash, no error is raised390    self.dl_results[a.url] = a.url_info391    dl_manager = self._get_manager(392        register_checksums=False,393        force_download=True,394    )395    dl_manager.download(a.url)396if __name__ == '__main__':...__init__.py
Source:__init__.py  
...7    result = []8    for r in type_attributes:9        result.append(r['attname'])10    return result11def _get_manager(plpy, hostname):12    import asterisk.manager13    plan = plpy.prepare("SELECT * FROM asterisk.managers WHERE host = $1", [ "text" ])14    r = plpy.execute(plan, [hostname], 1)15    r = r[0]16    manager = asterisk.manager.Manager()17    manager.connect(r['host'])18    manager.login(r['login'], r['passwd'])19    manager.event_registry = []20    return manager21def _handle_queue_member(event, manager):22    manager.event_registry.append(event)23def _handle_queue_entry(event, manager):24    manager.event_registry.append(event)25def _handle_queue_parameter(event, manager):26    manager.event_registry.append(event)27def _handle_peerentry(event, manager):28    manager.event_registry.append(event)29def queue_members(plpy, ami_host, queue):30    manager = _get_manager(plpy, ami_host)31    manager.register_event('QueueMember', _handle_queue_member)32    manager.send_action({ACTION : 'QueueStatus', QUEUE : queue})33    manager.logoff()34    return_type_attributes = _get_type_fields(plpy,'asterisk_queue_member')35    #plpy.error("error")36    #plpy.fatal("fatal")37    #plpy.debug("debug")38    #plpy.notice("notice")39    result = []40    for event in manager.event_registry:41        record = {}42        for sip_header in return_type_attributes:43            record[sip_header] = event.get_header(sip_header, None)44        result.append(record)45    return result46def queue_entries(plpy, ami_host, queue):47    manager = _get_manager(plpy, ami_host)48    manager.register_event('QueueEntry', _handle_queue_entry)49    manager.send_action({ACTION : 'QueueStatus', QUEUE : queue})50    manager.logoff()51    return_type_attributes = _get_type_fields(plpy,'asterisk_queue_entry')52    result = []53    for event in manager.event_registry:54        record = {}55        for sip_header in return_type_attributes:56            record[sip_header] = event.get_header(sip_header, None)57        result.append(record)58    return result59def queue_params(plpy, ami_host, queue):60    manager = _get_manager(plpy, ami_host)61    manager.register_event('QueueParams', _handle_queue_parameter)62    manager.send_action({ACTION : 'QueueStatus', QUEUE : queue})63    manager.logoff()64    return_type_attributes = _get_type_fields(plpy,'asterisk_queue_params')65    result = []66    for event in manager.event_registry:67        record = {}68        for sip_header in return_type_attributes:69            record[sip_header] = event.get_header(sip_header, None)70        result.append(record)71    return result72def sip_peers(plpy, ami_host):73    manager = _get_manager(plpy, ami_host)74    manager.register_event('PeerEntry', _handle_peerentry)75    manager.sippeers()76    manager.logoff()77    return_type_attributes = _get_type_fields(plpy,'peer_entry')78    result = []79    for event in manager.event_registry:80        record = {}81        for sip_header in return_type_attributes:82            record[sip_header] = event.get_header(sip_header, None)83        result.append(record)84    return result85def sipshowpeer(plpy, ami_host, peer):86    manager = _get_manager(plpy, ami_host)87    ami_result = manager.sipshowpeer(peer=peer)88    manager.logoff()89    return_type_attributes = _get_type_fields(plpy,'peer')90    result = {}91    for sip_header in return_type_attributes:92        result[sip_header] = ami_result.get_header(sip_header, None)93    return result94def originate_async(plpy, ami_host, channel, exten, context, priority):95    manager = _get_manager(plpy, ami_host)96    manager.originate(channel=channel, exten=exten, context=context, priority=priority, async=True)97    manager.logoff()98    return True99def queue_add(plpy, ami_host, queue, interface):100    manager = _get_manager(plpy, ami_host)101    cdict = {ACTION:'QueueAdd'}102    cdict['Interface'] = interface103    cdict[QUEUE] = queue104    cdict['Penalty'] = 1105    cdict['Paused'] = False106    response = manager.send_action(cdict)107    manager.logoff()108    return True109def queue_remove(plpy, ami_host, queue, interface):110    manager = _get_manager(plpy, ami_host)111    cdict = {ACTION:'QueueRemove'}112    cdict['Interface'] = interface113    cdict[QUEUE] = queue114    response = manager.send_action(cdict)115    manager.logoff()...commands.py
Source:commands.py  
2from . import settings3from . import transaction_sender4from .node_manager import NodeManager5def stop_miners(args):6    manager = _get_manager(args)7    manager.stop_miners(_get_miners(args))8def start_miners(args):9    manager = _get_manager(args)10    manager.start_miners(_get_miners(args))11def generate_transactions(args):12    manager = _get_manager(args)13    transaction_sender.generate_transactions(manager)14def list_nodes(args):15    manager = _get_manager(args)16    for node in manager.nodes:17        print(node.name)18def create_contract(args):19    manager = _get_manager(args)20    if args.node:21        node = manager.nodes[args.node]22    else:23        node = manager.get_random_node()24    transaction_sender.create_contract(manager, node, args.contract, args.name)25def generate_contract_calls(args):26    manager = _get_manager(args)27    transaction_sender.generate_contract_calls(manager)28def list_transactions(args):29    manager = _get_manager(args)30    node = manager.get_random_node()31    result = list(node.list_all_transactions())32    if args.output:33        with open(args.output) as f:34            json.dump(result, f)35    else:36        print(json.dumps(result))37def _get_manager(args):38    manager = NodeManager()39    manager.add_nodes_from_dir(args.chain_data)40    return manager41def _get_miners(args):42    if args.miners:43        return args.miners...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
