Best Python code snippet using stestr_python
dell_storagecenter_api.py
Source:dell_storagecenter_api.py  
...151        except AttributeError:152            LOG.error(_LE('Error invalid json: %s'),153                      blob)154        return None155    def _get_id(self, blob):156        try:157            if isinstance(blob, dict):158                return blob.get('instanceId')159        except AttributeError:160            LOG.error(_LE('Invalid API object: %s'),161                      blob)162        return None163    def open_connection(self):164        # Authenticate against EM165        payload = {}166        payload['Application'] = 'Cinder REST Driver'167        payload['ApplicationVersion'] = self.APIVERSION168        r = self.client.post('ApiConnection/Login',169                             payload)170        if r.status_code != 200:171            LOG.error(_LE('Login error: %(c)d %(r)s'),172                      {'c': r.status_code,173                       'r': r.reason})174            raise exception.VolumeBackendAPIException(175                _('Failed to connect to Enterprise Manager'))176    def close_connection(self):177        r = self.client.post('ApiConnection/Logout',178                             {})179        if r.status_code != 204:180            LOG.warning(_LW('Logout error: %(c)d %(r)s'),181                        {'c': r.status_code,182                         'r': r.reason})183        self.client = None184    def find_sc(self, ssn):185        '''This is really just a check that the sc is there and being managed by186        EM.187        '''188        r = self.client.get('StorageCenter/StorageCenter')189        result = self._get_result(r,190                                  'scSerialNumber',191                                  ssn)192        if result is None:193            LOG.error(_LE('Failed to find %(s)s.  Result %(r)s'),194                      {'s': ssn,195                       'r': r})196            raise exception.VolumeBackendAPIException(197                _('Failed to find Storage Center'))198        return self._get_id(result)199    # Volume functions200    def _create_folder(self, url, ssn, parent, folder):201        '''This is generic to server and volume folders.202        '''203        f = None204        payload = {}205        payload['Name'] = folder206        payload['StorageCenter'] = ssn207        if parent != '':208            payload['Parent'] = parent209        payload['Notes'] = self.notes210        r = self.client.post(url,211                             payload)212        if r.status_code != 201:213            LOG.debug('%(u)s error: %(c)d %(r)s',214                      {'u': url,215                       'c': r.status_code,216                       'r': r.reason})217        else:218            f = self._first_result(r)219        return f220    def _create_folder_path(self, url, ssn, foldername):221        '''This is generic to server and volume folders.222        '''223        path = self._path_to_array(foldername)224        folderpath = ''225        instanceId = ''226        # Technically the first folder is the root so that is already created.227        found = True228        f = None229        for folder in path:230            folderpath = folderpath + folder231            # If the last was found see if this part of the path exists too232            if found:233                listurl = url + '/GetList'234                f = self._find_folder(listurl,235                                      ssn,236                                      folderpath)237                if f is None:238                    found = False239            # We didn't find it so create it240            if found is False:241                f = self._create_folder(url,242                                        ssn,243                                        instanceId,244                                        folder)245            # If we haven't found a folder or created it then leave246            if f is None:247                LOG.error(_LE('Unable to create folder path %s'),248                          folderpath)249                break250            # Next part of the path will need this251            instanceId = self._get_id(f)252            folderpath = folderpath + '/'253        return f254    def _find_folder(self, url, ssn, foldername):255        '''Most of the time the folder will already have been created so256        we look for the end folder and check that the rest of the path is257        right.258        This is generic to server and volume folders.259        '''260        pf = PayloadFilter()261        pf.append('scSerialNumber', ssn)262        basename = os.path.basename(foldername)263        pf.append('Name', basename)264        # If we have any kind of path we add '/' to match the storage265        # center's convention and throw it into the filters.266        folderpath = os.path.dirname(foldername)267        if folderpath != '':268            folderpath += '/'269            pf.append('folderPath', folderpath)270        folder = None271        r = self.client.post(url,272                             pf.payload)273        if r.status_code == 200:274            folder = self._get_result(r,275                                      'folderPath',276                                      folderpath)277        else:278            LOG.debug('%(u)s error: %(c)d %(r)s',279                      {'u': url,280                       'c': r.status_code,281                       'r': r.reason})282        return folder283    def _create_volume_folder_path(self, ssn, foldername):284        return self._create_folder_path('StorageCenter/ScVolumeFolder',285                                        ssn,286                                        foldername)287    def _find_volume_folder(self, ssn, foldername):288        return self._find_folder('StorageCenter/ScVolumeFolder/GetList',289                                 ssn,290                                 foldername)291    def _init_volume(self, scvolume):292        '''Maps the volume to a random server and immediately unmaps293        it.  This initializes the volume.294        Don't wig out if this fails.295        '''296        pf = PayloadFilter()297        pf.append('scSerialNumber', scvolume.get('scSerialNumber'), 'Equals')298        r = self.client.post('StorageCenter/ScServer/GetList', pf.payload)299        if r.status_code == 200:300            scservers = self._get_json(r)301            # Sort through the servers looking for one with connectivity.302            for scserver in scservers:303                # TODO(tom_swanson): Add check for server type.304                # This needs to be either a physical or virtual server.305                # Outside of tempest tests this should not matter as we only306                # "init" a volume to allow snapshotting of an empty volume.307                if scserver.get('status', '').lower() != 'down':308                    # Map to actually create the volume309                    self.map_volume(scvolume,310                                    scserver)311                    self.unmap_volume(scvolume,312                                      scserver)313                    break314    def create_volume(self, name, size, ssn, volfolder):315        '''This creates a new volume on the storage center.  It316        will create it in volfolder.  If volfolder does not317        exist it will create it.  If it cannot create volfolder318        the volume will be created in the root.319        '''320        scvolume = None321        # Find our folder322        LOG.debug('Create Volume %(name)s %(ssn)s %(folder)s',323                  {'name': name,324                   'ssn': ssn,325                   'folder': volfolder})326        folder = self._find_volume_folder(ssn,327                                          volfolder)328        # Doesn't exist?  make it329        if folder is None:330            folder = self._create_volume_folder_path(ssn,331                                                     volfolder)332        # If we actually have a place to put our volume create it333        if folder is None:334            LOG.error(_LE('Unable to create folder %s'),335                      volfolder)336        # Create the volume337        payload = {}338        payload['Name'] = name339        payload['Notes'] = self.notes340        payload['Size'] = '%d GB' % size341        payload['StorageCenter'] = ssn342        if folder is not None:343            payload['VolumeFolder'] = self._get_id(folder)344        r = self.client.post('StorageCenter/ScVolume',345                             payload)346        if r.status_code == 201:347            scvolume = self._get_json(r)348        else:349            LOG.error(_LE('ScVolume create error %(name)s: %(c)d %(r)s'),350                      {'name': name,351                       'c': r.status_code,352                       'r': r.reason})353        if scvolume:354            LOG.info(_LI('Created volume %(instanceId)s: %(name)s'),355                     {'instanceId': scvolume['instanceId'],356                      'name': scvolume['name']})357        else:358            LOG.error(_LE('ScVolume returned success with empty payload.'359                          '  Attempting to locate volume'))360            # In theory it is there since success was returned.361            # Try one last time to find it before returning.362            scvolume = self.find_volume(ssn, name, None)363        return scvolume364    def find_volume(self, ssn, name=None, instanceid=None):365        '''search ssn for volume of name and/or instance id366        '''367        LOG.debug('finding volume %(sn)s : %(name)s : %(id)s',368                  {'sn': ssn,369                   'name': name,370                   'id': instanceid})371        pf = PayloadFilter()372        pf.append('scSerialNumber', ssn)373        # We need at least a name and or an instance id.  If we have374        # that we can find a volume.375        if instanceid is not None:376            pf.append('instanceId', instanceid)377        elif name is not None:378            pf.append('Name', name)379        else:380            return None381        r = self.client.post('StorageCenter/ScVolume/GetList',382                             pf.payload)383        if r.status_code != 200:384            LOG.debug('ScVolume GetList error %(i)s: %(c)d %(r)s',385                      {'i': instanceid,386                       'c': r.status_code,387                       'r': r.reason})388        return self._first_result(r)389    def delete_volume(self, ssn, name):390        # find our volume391        vol = self.find_volume(ssn, name, None)392        if vol is not None:393            r = self.client.delete('StorageCenter/ScVolume/%s'394                                   % self._get_id(vol))395            if r.status_code != 200:396                raise exception.VolumeBackendAPIException(397                    _('Error deleting volume %(ssn)s: %(sn)s: %(c)d %(r)s') %398                    {'ssn': ssn,399                     'sn': name,400                     'c': r.status_code,401                     'r': r.reason})402            # json return should be true or false403            return self._get_json(r)404        LOG.warning(_LW('delete_volume: unable to find volume %s'),405                    name)406        # If we can't find the volume then it is effectively gone.407        return True408    def _create_server_folder_path(self, ssn, foldername):409        return self._create_folder_path('StorageCenter/ScServerFolder',410                                        ssn,411                                        foldername)412    def _find_server_folder(self, ssn, foldername):413        return self._find_folder('StorageCenter/ScServerFolder/GetList',414                                 ssn,415                                 foldername)416    def _add_hba(self, scserver, wwnoriscsiname, isfc=False):417        '''Adds an HBA to the scserver.  The HBA will be added418        even if it has not been seen by the storage center.419        '''420        payload = {}421        if isfc is True:422            payload['HbaPortType'] = 'FibreChannel'423        else:424            payload['HbaPortType'] = 'Iscsi'425        payload['WwnOrIscsiName'] = wwnoriscsiname426        payload['AllowManual'] = True427        r = self.client.post('StorageCenter/ScPhysicalServer/%s/AddHba'428                             % self._get_id(scserver),429                             payload)430        if r.status_code != 200:431            LOG.error(_LE('AddHba error: %(i)s to %(s)s : %(c)d %(r)s'),432                      {'i': wwnoriscsiname,433                       's': scserver['name'],434                       'c': r.status_code,435                       'r': r.reason})436            return False437        return True438    # We do not know that we are red hat linux 6.x but that works439    # best for red hat and ubuntu.  So, there.440    def _find_serveros(self, ssn, osname='Red Hat Linux 6.x'):441        '''Returns the serveros instance id of the specified osname.442        Required to create a server.443        '''444        pf = PayloadFilter()445        pf.append('scSerialNumber', ssn)446        r = self.client.post('StorageCenter/ScServerOperatingSystem/GetList',447                             pf.payload)448        if r.status_code == 200:449            oslist = self._get_json(r)450            for srvos in oslist:451                name = srvos.get('name', 'nope')452                if name.lower() == osname.lower():453                    # Found it return the id454                    return self._get_id(srvos)455        LOG.warning(_LW('ScServerOperatingSystem GetList return: %(c)d %(r)s'),456                    {'c': r.status_code,457                     'r': r.reason})458        return None459    def create_server_multiple_hbas(self, ssn, foldername, wwns):460        '''Same as create_server except it can take a list of hbas.  hbas461        can be wwns or iqns.462        '''463        # Add hbas464        scserver = None465        # Our instance names466        for wwn in wwns:467            if scserver is None:468                # Use the fist wwn to create the server.469                scserver = self.create_server(ssn,470                                              foldername,471                                              wwn,472                                              True)473            else:474                # Add the wwn to our server475                self._add_hba(scserver,476                              wwn,477                              True)478        return scserver479    def create_server(self, ssn, foldername, wwnoriscsiname, isfc=False):480        '''creates a server on the the storage center ssn.  Adds the first481        HBA to it.482        '''483        scserver = None484        payload = {}485        payload['Name'] = 'Server_' + wwnoriscsiname486        payload['StorageCenter'] = ssn487        payload['Notes'] = self.notes488        # We pick Red Hat Linux 6.x because it supports multipath and489        # will attach luns to paths as they are found.490        scserveros = self._find_serveros(ssn, 'Red Hat Linux 6.x')491        if scserveros is not None:492            payload['OperatingSystem'] = scserveros493        # Find our folder or make it494        folder = self._find_server_folder(ssn,495                                          foldername)496        if folder is None:497            folder = self._create_server_folder_path(ssn,498                                                     foldername)499        # At this point it doesn't matter if the folder was created or not.500        # We just attempt to create the server.  Let it be in the root if501        # the folder creation fails.502        if folder is not None:503            payload['ServerFolder'] = self._get_id(folder)504        # create our server505        r = self.client.post('StorageCenter/ScPhysicalServer',506                             payload)507        if r.status_code != 201:508            LOG.error(_LE('ScPhysicalServer create error: %(i)s: %(c)d %(r)s'),509                      {'i': wwnoriscsiname,510                       'c': r.status_code,511                       'r': r.reason})512        else:513            # Server was created514            scserver = self._first_result(r)515            # Add hba to our server516            if scserver is not None:517                if not self._add_hba(scserver,518                                     wwnoriscsiname,519                                     isfc):520                    LOG.error(_LE('Error adding HBA to server'))521                    # Can't have a server without an HBA522                    self._delete_server(scserver)523                    scserver = None524        # Success or failure is determined by the caller525        return scserver526    def find_server(self, ssn, instance_name):527        '''Hunts for a server by looking for an HBA with the server's IQN528        or wwn.529        If found, the server the HBA is attached to, if any, is returned.530        '''531        scserver = None532        # We search for our server by first finding our HBA533        hba = self._find_serverhba(ssn, instance_name)534        # Once created hbas stay in the system.  So it isn't enough535        # that we found one it actually has to be attached to a536        # server.537        if hba is not None and hba.get('server') is not None:538            pf = PayloadFilter()539            pf.append('scSerialNumber', ssn)540            pf.append('instanceId', self._get_id(hba['server']))541            r = self.client.post('StorageCenter/ScServer/GetList',542                                 pf.payload)543            if r.status_code != 200:544                LOG.error(_LE('ScServer error: %(c)d %(r)s'),545                          {'c': r.status_code,546                           'r': r.reason})547            else:548                scserver = self._first_result(r)549        if scserver is None:550            LOG.debug('Server (%s) not found.',551                      instance_name)552        return scserver553    def _find_serverhba(self, ssn, instance_name):554        '''Hunts for a sc server HBA by looking for an HBA with the555        server's IQN or wwn.556        If found, the sc server HBA is returned.557        '''558        scserverhba = None559        # We search for our server by first finding our HBA560        pf = PayloadFilter()561        pf.append('scSerialNumber', ssn)562        pf.append('instanceName', instance_name)563        r = self.client.post('StorageCenter/ScServerHba/GetList',564                             pf.payload)565        if r.status_code != 200:566            LOG.debug('ScServerHba error: %(c)d %(r)s',567                      {'c': r.status_code,568                       'r': r.reason})569        else:570            scserverhba = self._first_result(r)571        return scserverhba572    def _find_domains(self, cportid):573        r = self.client.get('StorageCenter/ScControllerPort/%s/FaultDomainList'574                            % cportid)575        if r.status_code == 200:576            domains = self._get_json(r)577            return domains578        else:579            LOG.debug('FaultDomainList error: %(c)d %(r)s',580                      {'c': r.status_code,581                       'r': r.reason})582            LOG.error(_LE('Error getting FaultDomainList'))583        return None584    def _find_domain(self, cportid, domainip):585        '''Returns the fault domain which a given controller port can586        be seen by the server587        '''588        domains = self._find_domains(cportid)589        if domains:590            # Wiffle through the domains looking for our591            # configured ip.592            for domain in domains:593                # If this is us we return the port.594                if domain.get('targetIpv4Address',595                              domain.get('wellKnownIpAddress')) == domainip:596                    return domain597        return None598    def _find_fc_initiators(self, scserver):599        '''_find_fc_initiators600        returns the server's fc HBA's wwns601        '''602        initiators = []603        r = self.client.get('StorageCenter/ScServer/%s/HbaList'604                            % self._get_id(scserver))605        if r.status_code == 200:606            hbas = self._get_json(r)607            for hba in hbas:608                wwn = hba.get('instanceName')609                if (hba.get('portType') == 'FibreChannel' and610                        wwn is not None):611                    initiators.append(wwn)612        else:613            LOG.debug('HbaList error: %(c)d %(r)s',614                      {'c': r.status_code,615                       'r': r.reason})616            LOG.error(_LE('Unable to find FC intitiators'))617        return initiators618    def get_volume_count(self, scserver):619        r = self.client.get('StorageCenter/ScServer/%s/MappingList'620                            % self._get_id(scserver))621        if r.status_code == 200:622            mappings = self._get_json(r)623            return len(mappings)624        # Panic mildly but do not return 0.625        return -1626    def _find_mappings(self, scvolume):627        '''find mappings628        returns the volume's mappings629        '''630        mappings = []631        if scvolume.get('active', False):632            r = self.client.get('StorageCenter/ScVolume/%s/MappingList'633                                % self._get_id(scvolume))634            if r.status_code == 200:635                mappings = self._get_json(r)636            else:637                LOG.debug('MappingList error: %(c)d %(r)s',638                          {'c': r.status_code,639                           'r': r.reason})640                LOG.error(_LE('Unable to find volume mappings: %s'),641                          scvolume.get('name'))642        else:643            LOG.error(_LE('_find_mappings: volume is not active'))644        return mappings645    def _find_controller_port(self, cportid):646        '''_find_controller_port647        returns the controller port dict648        '''649        controllerport = None650        r = self.client.get('StorageCenter/ScControllerPort/%s'651                            % cportid)652        if r.status_code == 200:653            controllerport = self._first_result(r)654        else:655            LOG.debug('ScControllerPort error: %(c)d %(r)s',656                      {'c': r.status_code,657                       'r': r.reason})658            LOG.error(_LE('Unable to find controller port: %s'),659                      cportid)660        return controllerport661    def find_wwns(self, scvolume, scserver):662        '''returns the lun and wwns of the mapped volume'''663        # Our returnables664        lun = None  # our lun.  We return the first lun.665        wwns = []  # list of targets666        itmap = {}  # dict of initiators and the associated targets667        # Make sure we know our server's initiators.  Only return668        # mappings that contain HBA for this server.669        initiators = self._find_fc_initiators(scserver)670        # Get our volume mappings671        mappings = self._find_mappings(scvolume)672        if len(mappings) > 0:673            # We check each of our mappings.  We want to return674            # the mapping we have been configured to use.675            for mapping in mappings:676                # Find the controller port for this mapping677                cport = mapping.get('controllerPort')678                controllerport = self._find_controller_port(679                    self._get_id(cport))680                if controllerport is not None:681                    # This changed case at one point or another.682                    # Look for both keys.683                    wwn = controllerport.get('wwn',684                                             controllerport.get('WWN'))685                    if wwn is None:686                        LOG.error(_LE('Find_wwns: Unable to find port wwn'))687                    serverhba = mapping.get('serverHba')688                    if wwn is not None and serverhba is not None:689                        hbaname = serverhba.get('instanceName')690                        if hbaname in initiators:691                            if itmap.get(hbaname) is None:692                                itmap[hbaname] = []693                            itmap[hbaname].append(wwn)694                            wwns.append(wwn)695                            mappinglun = mapping.get('lun')696                            if lun is None:697                                lun = mappinglun698                            elif lun != mappinglun:699                                LOG.warning(_LW('Inconsistent Luns.'))700        else:701            LOG.error(_LE('Find_wwns: Volume appears unmapped'))702        LOG.debug(lun)703        LOG.debug(wwns)704        LOG.debug(itmap)705        # TODO(tom_swanson): if we have nothing to return raise an exception706        # here.  We can't do anything with an unmapped volume.  We shouldn't707        # pretend we succeeded.708        return lun, wwns, itmap709    def _find_active_controller(self, scvolume):710        LOG.debug('find_active_controller')711        activecontroller = None712        r = self.client.get('StorageCenter/ScVolume/%s/VolumeConfiguration'713                            % self._get_id(scvolume))714        if r.status_code == 200:715            volumeconfiguration = self._first_result(r)716            controller = volumeconfiguration.get('controller')717            activecontroller = self._get_id(controller)718        LOG.debug('activecontroller %s', activecontroller)719        return activecontroller720    def find_iscsi_properties(self, scvolume, ip=None, port=None):721        LOG.debug('enter find_iscsi_properties')722        LOG.debug('scvolume: %s', scvolume)723        activeindex = -1724        luns = []725        iqns = []726        portals = []727        access_mode = 'rw'728        mappings = self._find_mappings(scvolume)729        activecontroller = self._find_active_controller(scvolume)730        if len(mappings) > 0:731            for mapping in mappings:732                LOG.debug('mapping: %s', mapping)733                # find the controller port for this mapping734                cport = mapping.get('controllerPort')735                cportid = self._get_id(cport)736                domains = self._find_domains(cportid)737                if domains:738                    controllerport = self._find_controller_port(cportid)739                    LOG.debug('controllerport: %s', controllerport)740                    if controllerport is not None:741                        appendproperties = False742                        for d in domains:743                            LOG.debug('domain: %s', d)744                            ipaddress = d.get('targetIpv4Address',745                                              d.get('wellKnownIpAddress'))746                            portnumber = d.get('portNumber')747                            if ((ip is None or ip == ipaddress) and748                                    (port is None or port == portnumber)):749                                portal = (ipaddress + ':' +750                                          six.text_type(portnumber))751                                # I'm not sure when we can have more than752                                # one portal for a domain but since it is an753                                # array being returned it is best to check.754                                if portals.count(portal) == 0:755                                    appendproperties = True756                                    portals.append(portal)757                                else:758                                    LOG.debug('Domain %s has two portals.',759                                              self._get_id(d))760                        # We do not report lun and iqn info unless it is for761                        # the configured port OR the user has not enabled762                        # multipath.  (In which case ip and port sent in763                        # will be None).764                        if appendproperties is True:765                            iqns.append(controllerport.get('iscsiName'))766                            luns.append(mapping.get('lun'))767                            if activeindex == -1:768                                controller = controllerport.get('controller')769                                controllerid = self._get_id(controller)770                                if controllerid == activecontroller:771                                    activeindex = len(iqns) - 1772                        if mapping['readOnly'] is True:773                            access_mode = 'ro'774        if activeindex == -1:775            LOG.debug('Volume is not yet active on any controller.')776            activeindex = 0777        data = {'target_discovered': False,778                'target_iqns': iqns,779                'target_portals': portals,780                'target_luns': luns,781                'access_mode': access_mode782                }783        LOG.debug('find_iscsi_properties return: %s', data)784        return activeindex, data785    def map_volume(self, scvolume, scserver):786        '''map_volume787        The check for server existence is elsewhere;  does not create the788        server.789        '''790        # Make sure we have what we think we have791        serverid = self._get_id(scserver)792        volumeid = self._get_id(scvolume)793        if serverid is not None and volumeid is not None:794            payload = {}795            payload['server'] = serverid796            advanced = {}797            advanced['MapToDownServerHbas'] = True798            payload['Advanced'] = advanced799            r = self.client.post('StorageCenter/ScVolume/%s/MapToServer'800                                 % volumeid,801                                 payload)802            if r.status_code == 200:803                # We just return our mapping804                return self._first_result(r)805            # Should not be here.806            LOG.debug('MapToServer error: %(c)d %(r)s',807                      {'c': r.status_code,808                       'r': r.reason})809        # Error out810        LOG.error(_LE('Unable to map %(vol)s to %(srv)s'),811                  {'vol': scvolume['name'],812                   'srv': scserver['name']})813        return None814    def unmap_volume(self, scvolume, scserver):815        '''unmap_volume816        deletes all mappings to a server, not just the ones on the path817        defined in cinder.conf.818        '''819        rtn = True820        serverid = self._get_id(scserver)821        volumeid = self._get_id(scvolume)822        if serverid is not None and volumeid is not None:823            r = self.client.get('StorageCenter/ScVolume/%s/MappingProfileList'824                                % volumeid)825            if r.status_code == 200:826                profiles = self._get_json(r)827                for profile in profiles:828                    prosrv = profile.get('server')829                    if prosrv is not None and self._get_id(prosrv) == serverid:830                        r = self.client.delete(831                            'StorageCenter/ScMappingProfile/%s'832                            % self._get_id(profile))833                        if (r.status_code != 200 or r.ok is False):834                            LOG.debug('ScMappingProfile error: %(c)d %(r)s',835                                      {'c': r.status_code,836                                       'r': r.reason})837                            LOG.error(_LE('Unable to unmap Volume %s'),838                                      volumeid)839                            # 1 failed unmap is as good as 100.840                            # Fail it and leave841                            rtn = False842                            break843                        LOG.debug('Volume %(v)s unmapped from %(s)s',844                                  {'v': volumeid,845                                   's': serverid})846            else:847                LOG.debug('MappingProfileList error: %(c)d %(r)s',848                          {'c': r.status_code,849                           'r': r.reason})850                rtn = False851        return rtn852    def get_storage_usage(self, ssn):853        '''get_storage_usage'''854        storageusage = None855        if ssn is not None:856            r = self.client.get('StorageCenter/StorageCenter/%s/StorageUsage'857                                % ssn)858            if r.status_code == 200:859                storageusage = self._get_json(r)860            else:861                LOG.debug('StorageUsage error: %(c)d %(r)s',862                          {'c': r.status_code,863                           'r': r.reason})864        return storageusage865    def create_replay(self, scvolume, replayid, expire):866        '''create_replay867        expire is in minutes.868        one could snap a volume before it has been activated, so activate869        by mapping and unmapping to a random server and let them.  This870        should be a fail but the Tempest tests require it.871        '''872        replay = None873        if scvolume is not None:874            if (scvolume.get('active') is not True or875                    scvolume.get('replayAllowed') is not True):876                self._init_volume(scvolume)877            payload = {}878            payload['description'] = replayid879            payload['expireTime'] = expire880            r = self.client.post('StorageCenter/ScVolume/%s/CreateReplay'881                                 % self._get_id(scvolume),882                                 payload)883            if r.status_code != 200:884                LOG.debug('CreateReplay error: %(c)d %(r)s',885                          {'c': r.status_code,886                           'r': r.reason})887                LOG.error(_LE('Error creating replay.'))888            else:889                replay = self._first_result(r)890        return replay891    def find_replay(self, scvolume, replayid):892        '''find_replay893        searches for the replay by replayid which we store in the894        replay's description attribute895        '''896        replay = None897        r = self.client.get('StorageCenter/ScVolume/%s/ReplayList'898                            % self._get_id(scvolume))899        try:900            content = self._get_json(r)901            # This will be a list.  If it isn't bail902            if isinstance(content, list):903                for r in content:904                    # The only place to save our information with the public905                    # api is the description field which isn't quite long906                    # enough.  So we check that our description is pretty much907                    # the max length and we compare that to the start of908                    # the snapshot id.909                    description = r.get('description')910                    if (len(description) >= 30 and911                            replayid.startswith(description) is True and912                            r.get('markedForExpiration') is not True):913                        replay = r914                        break915        except Exception:916            LOG.error(_LE('Invalid ReplayList return: %s'),917                      r)918        if replay is None:919            LOG.debug('Unable to find snapshot %s',920                      replayid)921        return replay922    def delete_replay(self, scvolume, replayid):923        '''delete_replay924        hunts down a replay by replayid string and expires it.925        once marked for expiration we do not return the replay as926        a snapshot.927        '''928        LOG.debug('Expiring replay %s', replayid)929        replay = self.find_replay(scvolume,930                                  replayid)931        if replay is not None:932            r = self.client.post('StorageCenter/ScReplay/%s/Expire'933                                 % self._get_id(replay),934                                 {})935            if r.status_code != 204:936                LOG.debug('ScReplay Expire error: %(c)d %(r)s',937                          {'c': r.status_code,938                           'r': r.reason})939                return False940        # We either couldn't find it or expired it.941        return True942    def create_view_volume(self, volname, volfolder, screplay):943        '''create_view_volume944        creates a new volume named volname in the folder945        volfolder from the screplay.946        '''947        # find our ssn and get our folder948        ssn = screplay.get('scSerialNumber')949        folder = self._find_volume_folder(ssn,950                                          volfolder)951        # Doesn't exist?  make it952        if folder is None:953            folder = self._create_volume_folder_path(ssn,954                                                     volfolder)955        # payload is just the volume name and folder if we have one.956        payload = {}957        payload['Name'] = volname958        payload['Notes'] = self.notes959        if folder is not None:960            payload['VolumeFolder'] = self._get_id(folder)961        r = self.client.post('StorageCenter/ScReplay/%s/CreateView'962                             % self._get_id(screplay),963                             payload)964        volume = None965        if r.status_code == 200:966            volume = self._first_result(r)967        else:968            LOG.debug('ScReplay CreateView error: %(c)d %(r)s',969                      {'c': r.status_code,970                       'r': r.reason})971        if volume is None:972            LOG.error(_LE('Unable to create volume %s from replay'),973                      volname)974        return volume975    def create_cloned_volume(self, volumename, volumefolder, scvolume):976        '''create_cloned_volume977        creates a temporary replay and then creates a978        view volume from that.979        '''980        clone = None981        replay = self.create_replay(scvolume,982                                    'Cinder Clone Replay',983                                    60)984        if replay is not None:985            clone = self.create_view_volume(volumename,986                                            volumefolder,987                                            replay)988        else:989            LOG.error(_LE('Error: unable to snap replay'))990        return clone991    def expand_volume(self, scvolume, newsize):992        '''expand_volume'''993        payload = {}994        payload['NewSize'] = '%d GB' % newsize995        r = self.client.post('StorageCenter/ScVolume/%s/ExpandToSize'996                             % self._get_id(scvolume),997                             payload)998        vol = None999        if r.status_code == 200:1000            vol = self._get_json(r)1001        else:1002            LOG.error(_LE('Error expanding volume %(n)s: %(c)d %(r)s'),1003                      {'n': scvolume['name'],1004                       'c': r.status_code,1005                       'r': r.reason})1006        if vol is not None:1007            LOG.debug('Volume expanded: %(i)s %(s)s',1008                      {'i': vol['instanceId'],1009                       's': vol['configuredSize']})1010        return vol1011    def _delete_server(self, scserver):1012        '''_delete_server1013        Just give it a shot.  If it fails it doesn't matter to cinder.1014        '''1015        if scserver.get('deleteAllowed') is True:1016            r = self.client.delete('StorageCenter/ScServer/%s'1017                                   % self._get_id(scserver))1018            LOG.debug('ScServer %(i)s delete return: %(c)d %(r)s',1019                      {'i': self._get_id(scserver),1020                       'c': r.status_code,1021                       'r': r.reason})1022        else:...table.py
Source:table.py  
...21                       'id': 'new_col_id',22                       'renamable': True,23                       'type': 'numeric'}24INDEX_STYLE = {'textAlign': 'left', 'fontWeight': 'bold'}25def _get_id(subcomponent_id, aio_id):26    return {27        'component': 'CustomDataTable',28        'subcomponent': subcomponent_id,29        'aio_id': aio_id30    }31def _init_datatable_values(dataframe: pd.DataFrame, index_col: List[str]):32    """Initialize main Dash.DataTable parameters33    :param dataframe: Dataframe containing the values to generate the table from. The index are not34    significant. However the columns are.35    :param index_col:36    :return: List[Dict], List[Dict], int37    """38    # generate mapping between columns name and unique id39    columns = [{"name": col, "id": str(uuid4()), **DEFAULT_COLUMNS_OPTIONS} if col not in index_col40               else {"name": col, "id": col, **INDEX_COLUMNS_OPTIONS}41               for col in dataframe.columns]42    columns_count = len(columns) - 143    datas = dataframe.copy()44    datas['id'] = np.arange(datas.shape[0])45    datas = datas. \46        rename(columns={value['name']: value['id'] for value in columns}). \47        to_dict('records')48    # append the last column49    if all([col != NEW_COLUMNS_OPTIONS['id'] for col in columns]):50        columns += [NEW_COLUMNS_OPTIONS]51    for item in datas:52        item[NEW_COLUMNS_OPTIONS['id']] = ''53    return datas, columns, columns_count54def _as_dataframe(records: List[Dict]) -> pd.DataFrame:55    """Transform records into dataframe. Also takes care of None and nan values.56    :param records: Expects a List of dictionary with at least 'id' as key.57    :return: DataFrame with id column as index58    """59    return pd.DataFrame(records). \60        replace([None], np.nan). \61        replace(np.nan, ''). \62        set_index('id')63def _to_records(dataframe: pd.DataFrame) -> List[Dict]:64    """Transform dataframe into records. Replace empty strings by Nan values65    :param dataframe: Expects a dataframe with 'id' as index.66    :return: List of dictionaries67    """68    return dataframe. \69        replace('', np.nan). \70        reset_index(). \71        to_dict('records')72def _get_visible_records(dataframe: pd.DataFrame, is_hidden: bool, columns: List[Dict]) -> \73        List[Dict]:74    """75    :param dataframe: Expects empty strings when76    :param is_hidden: When True, return only77    :param columns: List of dictionaries. Expects 'type' as key holding either 'text' or78    'numeric' values.79    :return:80    """81    if not is_hidden:82        return _to_records(dataframe)83    else:84        # get rows that are not empty (-1 because of label in first column)85        len_index_col = sum([col['type'] == 'text' for col in columns])86        mask = (dataframe != '').sum(axis=1) - len_index_col != 087        return _to_records(dataframe[mask])88def _has_last_column_values_changed(data: pd.DataFrame, data_previous: pd.DataFrame) -> bool:89    change = data != data_previous90    changed_column = data.columns[change.any(axis=0)]91    return NEW_COLUMNS_OPTIONS['id'] in changed_column92def _is_last_column_renamed(columns: List[Dict]) -> bool:93    return columns[-1]['name'] != NEW_COLUMNS_OPTIONS['name']94def _sync_change_with_data_reference(datas: pd.DataFrame, datas_reference: pd.DataFrame) -> \95        Tuple[pd.DataFrame, pd.DataFrame]:96    """Synchronize data_reference with datas. Reindex datas_reference columns with datas columns in97    case columns deletion. Update datas_reference values with datas values on common position."""98    datas_reference = datas_reference.reindex(columns=datas.columns)99    datas_reference.loc[datas.index, datas.columns] = datas100    return datas, datas_reference101def _get_records(datas_reference: pd.DataFrame, is_hidden: bool, columns: List[Dict]) -> \102        Tuple[List[Dict], List[Dict]]:103    """Returns datas_reference as records and datas as visible records."""104    data = _get_visible_records(datas_reference, is_hidden, columns)105    datas_reference = _to_records(datas_reference)106    return data, datas_reference107def _append_in_last_column(data_reference: pd.DataFrame, columns_count: int,108                           columns: List[Dict]):109    columns_count += 1110    new_column_name = f'Col {columns_count}'111    new_id = str(uuid4())112    columns += [{'id': new_id, 'name': new_column_name, **DEFAULT_COLUMNS_OPTIONS}]113    # swap list index in order to always have the last column available114    columns[-1], columns[-2] = columns[-2], columns[-1]115    data_reference[new_id] = data_reference[NEW_COLUMNS_OPTIONS['id']]116    data_reference[NEW_COLUMNS_OPTIONS['id']] = ''117    return columns_count, columns, data_reference118def _handle_last_column_naming(data_reference, columns):119    new_id = str(uuid4())120    columns[-1]['id'] = new_id121    columns.append(NEW_COLUMNS_OPTIONS)122    data_reference[new_id] = ''123    return data_reference, columns124def get_table_from_dataframe(app: 'Dash', dataframe, index_col: List[str], id, table_kwargs):125    """126    :param app:127    :param dataframe:128    :param index_col:129    :param id:130    :param table_kwargs:131    :return:132    """133    datas, columns, columns_count = _init_datatable_values(dataframe, index_col)134    style_header_conditional = [{'if': {'column_id': NEW_COLUMNS_OPTIONS['id']},135                                 'fontStyle': 'italic', 'color': 'rgb(128, 128, 128)',136                                 'backgroundColor': 'rgba(200, 200, 200, 0.1'}]137    style_header_conditional += [{'if': {'column_id': col}, **INDEX_STYLE} for col in index_col]138    style_data_conditional = [{'if': {'column_id': NEW_COLUMNS_OPTIONS['id']},139                               'fontStyle': 'italic', 'width': 'auto',140                               'color': 'rgb(128, 128, 128)',141                               'backgroundColor': 'rgba(200, 200, 200, 0.1'}]142    style_data_conditional += [{'if': {'column_id': col}, **INDEX_STYLE} for col in index_col]143    tooltip = {item['id']: {'value': item['name'], 'use_with': 'header'} for item in columns}144    table = DataTable(data=datas, columns=columns,145                      tooltip=tooltip,146                      id=_get_id('table', id),147                      row_selectable=False,148                      row_deletable=False,149                      fixed_columns={'headers': True, 'data': len(index_col)},150                      style_as_list_view=True,151                      style_table={'overflowX': 'auto', 'minWidth': '100%'},152                      style_data={'width': '5em', },153                      style_header={'width': '5em'},154                      style_header_conditional=style_header_conditional,155                      style_data_conditional=style_data_conditional,156                      **table_kwargs)157    table = dbc.Col(table, className='table-container')158    sidebar = [159        dbc.Button(id=_get_id('open-button', id), children='\u2630', className='side-bar-item'),160        dbc.Checkbox(id=_get_id('hide-rows-checkbox', id), value=False, label='Hide',161                     className='side-bar-item')]162    sidebar = html.Div(sidebar, className='side-bar', id=_get_id('side-bar', id))163    stores = [dcc.Store(id=_get_id('side-bar-status-store', id), data=False),164              dcc.Store(id=_get_id('columns-mapping-store', id), data=columns),165              dcc.Store(id=_get_id('columns-count-store', id), data=columns_count),166              dcc.Store(id=_get_id('table-data-store', id), data=datas)]167    layout = html.Div([table, sidebar, *stores], className='customdatatable-container')168    @app.callback(Output(_get_id('table', id), 'data'),169                  Output(_get_id('columns-count-store', id), 'data'),170                  Output(_get_id('table', id), 'columns'),171                  Output(_get_id('table-data-store', id), 'data'),172                  Input(_get_id('table', id), 'data'),173                  Input(_get_id('table', id), 'columns'),174                  Input(_get_id('hide-rows-checkbox', id), 'value'),175                  State(_get_id('table', id), 'data_previous'),176                  State(_get_id('table-data-store', id), 'data'),177                  State(_get_id('columns-count-store', id), 'data'))178    def on_data_change(data, columns, is_hidden, data_previous, data_reference, columns_count):179        # On change of the data of the table180        context = get_context()181        if context == (_get_id('table', id), 'data') and data_previous is not None:182            data = _as_dataframe(data)183            data_previous = _as_dataframe(data_previous)184            data_reference = _as_dataframe(data_reference)185            data, data_reference = _sync_change_with_data_reference(data, data_reference)186            # If change was in the last column, append a new column and keep the new column at the187            # end of the table188            if _has_last_column_values_changed(data, data_previous):189                columns_count, columns, data_reference = \190                    _append_in_last_column(data_reference, columns_count, columns)191                data, data_reference = _get_records(data_reference, is_hidden, columns)192            else:193                data, data_reference = _get_records(data_reference, is_hidden, columns)194                columns_count, columns = dash.no_update, dash.no_update195            # keep the table, and data store synchronized196            return data, columns_count, columns, data_reference197        # On change of the columns198        elif context == (_get_id('table', id), 'columns'):199            # if the change was in the last column, change its id and append it200            data = _as_dataframe(data)201            data_reference = _as_dataframe(data_reference)202            data, data_reference = _sync_change_with_data_reference(data, data_reference)203            if _is_last_column_renamed(columns):204                data_reference, columns = _handle_last_column_naming(data_reference, columns)205                data, data_reference = _get_records(data_reference, is_hidden, columns)206                return data, dash.no_update, columns, data_reference207            return dash.no_update, dash.no_update, dash.no_update, _to_records(data_reference)208        elif context == (_get_id('hide-rows-checkbox', id), 'value'):209            data_reference = _as_dataframe(data_reference)210            data, data_reference = _get_records(data_reference, is_hidden, columns)211            return data, dash.no_update, dash.no_update, dash.no_update212        else:213            raise PreventUpdate214    app.clientside_callback(ClientsideFunction('clientside', 'open_sidebar'),215                            Output(_get_id('side-bar', id), 'className'),216                            Output(_get_id('side-bar-status-store', id), 'data'),217                            Output(_get_id('open-button', id), 'children'),218                            Input(_get_id('open-button', id), 'n_clicks'),219                            State(_get_id('side-bar-status-store', id), 'data')220                            )221    app.clientside_callback(ClientsideFunction('clientside', 'synchronize_columns_mapping'),222                            Output(_get_id('columns-mapping-store', id), 'data'),223                            Input(_get_id('table', id), 'columns')224                            )225    app.clientside_callback(ClientsideFunction('clientside', 'synchronize_tooltip'),226                            Output(_get_id('table', id), 'tooltip'),227                            Input(_get_id('table', id), 'columns')228                            )...Coindelta.py
Source:Coindelta.py  
1import requests2import threading3import bin.Helpers as helper4"""5This class is responsible to retrieve the Best Bid (INR) data from Coindelta 6"""7class Coindelta(threading.Thread):8    # Constants9    _API_URL = "https://coindelta.com/api/v1/public/getticker/"10    _SUFFIX = "-inr"11    _LOOK_TAG = "MarketName"12    _GET_TAG = "Bid"13    def __init__(self, _this):14        threading.Thread.__init__(self)15        self._calling_class = _this16        self._get_id = ""17        return18    def run(self):19        if self._get_id != "":20            try:21                r = requests.get(self._API_URL)22                value = self._parse(r.text)23                self._calling_class.newY(value, self._get_id.upper())24            except Exception:25                self._calling_class.error("CONNECTION ERROR")26        else:27            self._calling_class.error("CURRENCY NOT SELECTED")28        return29    def set_id(self, _id):30        self._get_id = _id31        return32    def _parse(self, text):33        data = helper.loadJson(text=text)34        look_for = self._get_id.lower() + "" + self._SUFFIX35        for item in data:36            if item[self._LOOK_TAG] == look_for:37                return item[self._GET_TAG]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
