Best Python code snippet using tempest_python
asset.py
Source:asset.py  
...79        self.asset_name = os.path.basename(self.parsed_name.path)80        # set relative_dir for the asset81        self.relative_dir = os.path.join(self._get_relative_dir(),82                                         self.asset_name)83    def _create_hash_file(self, asset_path):84        """85        Compute the hash of the asset file and add it to the CHECKSUM86        file.87        :param asset_path: full path of the asset file.88        """89        result = crypto.hash_file(asset_path, algorithm=self.algorithm)90        with open(self._get_hash_file(asset_path), 'w') as hash_file:91            hash_file.write('%s %s\n' % (self.algorithm, result))92    def _create_metadata_file(self, asset_file):93        """94        Creates JSON file with metadata.95        The file will be saved as `asset_file`_metadata.json96        :param asset_file: The asset whose metadata will be saved97        :type asset_file: str98        """99        if self.metadata is not None:100            basename = os.path.splitext(asset_file)[0]101            metadata_path = "%s_metadata.json" % basename102            with open(metadata_path, "w") as metadata_file:103                json.dump(self.metadata, metadata_file)104    def _download(self, url_obj, asset_path):105        """106        Download the asset from an uri.107        :param url_obj: object from urlparse.108        :param asset_path: full path of the asset file.109        :returns: if the downloaded file matches the hash.110        :rtype: bool111        """112        try:113            # Temporary unique name to use while downloading114            temp = '%s.%s' % (asset_path,115                              next(tempfile._get_candidate_names()))  # pylint: disable=W0212116            url_download(url_obj.geturl(), temp)117            # Acquire lock only after download the file118            with FileLock(asset_path, 1):119                shutil.copy(temp, asset_path)120                self._create_hash_file(asset_path)121                return self._verify_hash(asset_path)122        finally:123            try:124                os.remove(temp)125            except FileNotFoundError:126                LOG.info("Temporary asset file unavailable due to failed"127                         " download attempt.")128    @staticmethod129    def _get_hash_file(asset_path):130        """131        Returns the file name that contains the hash for a given asset file132        :param asset_path: full path of the asset file.133        :returns: the CHECKSUM path134        :rtype: str135        """136        return '%s-CHECKSUM' % asset_path137    def _get_hash_from_file(self, asset_path):138        """139        Read the CHECKSUM file from the asset and return the hash.140        :param asset_path: full path of the asset file.141        :returns: the hash, if it exists.142        :rtype: str143        """144        discovered = None145        hash_file = self._get_hash_file(asset_path)146        if not os.path.isfile(hash_file):147            self._create_hash_file(asset_path)148        with open(hash_file, 'r') as hash_file:149            for line in hash_file:150                # md5 is 32 chars big and sha512 is 128 chars big.151                # others supported algorithms are between those.152                pattern = '%s [a-f0-9]{32,128}' % self.algorithm153                if re.match(pattern, line):154                    discovered = line.split()[1]155                    break156        return discovered157    def _get_local_file(self, url_obj, asset_path):158        """159        Create a symlink for a local file into the cache.160        :param url_obj: object from urlparse.161        :param asset_path: full path of the asset file.162        :returns: if the local file matches the hash.163        :rtype: bool164        """165        if os.path.isdir(url_obj.path):166            path = os.path.join(url_obj.path, self.name)167        else:168            path = url_obj.path169        with FileLock(asset_path, 1):170            try:171                os.symlink(path, asset_path)172                self._create_hash_file(asset_path)173                return self._verify_hash(asset_path)174            except OSError as detail:175                if detail.errno == errno.EEXIST:176                    os.remove(asset_path)177                    os.symlink(path, asset_path)178                    self._create_hash_file(asset_path)179                    return self._verify_hash(asset_path)180    def _get_relative_dir(self):181        """182        When an asset name is not an URI, and:183          1. it also has a hash;184          2. or it has multiple locations;185        there's a clear intention for it to be unique *by name*, overwriting186        it if the file is corrupted or expired. These will be stored in the187        cache directory indexed by name.188        When an asset name is an URI, whether it has a hash or not, it will be189        saved according to their locations, so that multiple assets with the190        same file name, but completely unrelated to each other, will still191        coexist.192        :returns: target location of asset the file....test_accounts.py
Source:test_accounts.py  
...78        # Emulate the lock existing on the filesystem79        self.useFixture(mockpatch.Patch('os.path.isfile', return_value=True))80        with mock.patch('__builtin__.open', mock.mock_open(), create=True):81            test_account_class = accounts.Accounts('test_name')82            res = test_account_class._create_hash_file('12345')83        self.assertFalse(res, "_create_hash_file should return False if the "84                         "pseudo-lock file already exists")85    def test_create_hash_file_no_previous_file(self):86        # Emulate the lock not existing on the filesystem87        self.useFixture(mockpatch.Patch('os.path.isfile', return_value=False))88        with mock.patch('__builtin__.open', mock.mock_open(), create=True):89            test_account_class = accounts.Accounts('test_name')90            res = test_account_class._create_hash_file('12345')91        self.assertTrue(res, "_create_hash_file should return True if the "92                        "pseudo-lock doesn't already exist")93    @mock.patch('tempest.openstack.common.lockutils.lock')94    def test_get_free_hash_no_previous_accounts(self, lock_mock):95        # Emulate no pre-existing lock96        self.useFixture(mockpatch.Patch('os.path.isdir', return_value=False))97        hash_list = self._get_hash_list(self.test_accounts)98        mkdir_mock = self.useFixture(mockpatch.Patch('os.mkdir'))99        self.useFixture(mockpatch.Patch('os.path.isfile', return_value=False))100        test_account_class = accounts.Accounts('test_name')101        with mock.patch('__builtin__.open', mock.mock_open(),102                        create=True) as open_mock:103            test_account_class._get_free_hash(hash_list)104            lock_path = os.path.join(accounts.CONF.lock_path, 'test_accounts',...accounts.py
Source:accounts.py  
...42            hash_dict[temp_hash.hexdigest()] = account43        return hash_dict44    def is_multi_user(self):45        return len(self.hash_dict) > 146    def _create_hash_file(self, hash_string):47        path = os.path.join(os.path.join(self.accounts_dir, hash_string))48        if not os.path.isfile(path):49            open(path, 'w').close()50            return True51        return False52    @lockutils.synchronized('test_accounts_io', external=True)53    def _get_free_hash(self, hashes):54        if not os.path.isdir(self.accounts_dir):55            os.mkdir(self.accounts_dir)56            # Create File from first hash (since none are in use)57            self._create_hash_file(hashes[0])58            return hashes[0]59        for _hash in hashes:60            res = self._create_hash_file(_hash)61            if res:62                return _hash63        msg = 'Insufficient number of users provided'64        raise exceptions.InvalidConfiguration(msg)65    def _get_creds(self):66        free_hash = self._get_free_hash(self.hash_dict.keys())67        return self.hash_dict[free_hash]68    @lockutils.synchronized('test_accounts_io', external=True)69    def remove_hash(self, hash_string):70        hash_path = os.path.join(self.accounts_dir, hash_string)71        if not os.path.isfile(hash_path):72            LOG.warning('Expected an account lock file %s to remove, but '73                        'one did not exist')74        else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
