How to use _get_id method in stestr

Best Python code snippet using stestr_python

dell_storagecenter_api.py

Source:dell_storagecenter_api.py Github

copy

Full Screen

...151 except AttributeError:152 LOG.error(_LE('Error invalid json: %s'),153 blob)154 return None155 def _get_id(self, blob):156 try:157 if isinstance(blob, dict):158 return blob.get('instanceId')159 except AttributeError:160 LOG.error(_LE('Invalid API object: %s'),161 blob)162 return None163 def open_connection(self):164 # Authenticate against EM165 payload = {}166 payload['Application'] = 'Cinder REST Driver'167 payload['ApplicationVersion'] = self.APIVERSION168 r = self.client.post('ApiConnection/Login',169 payload)170 if r.status_code != 200:171 LOG.error(_LE('Login error: %(c)d %(r)s'),172 {'c': r.status_code,173 'r': r.reason})174 raise exception.VolumeBackendAPIException(175 _('Failed to connect to Enterprise Manager'))176 def close_connection(self):177 r = self.client.post('ApiConnection/Logout',178 {})179 if r.status_code != 204:180 LOG.warning(_LW('Logout error: %(c)d %(r)s'),181 {'c': r.status_code,182 'r': r.reason})183 self.client = None184 def find_sc(self, ssn):185 '''This is really just a check that the sc is there and being managed by186 EM.187 '''188 r = self.client.get('StorageCenter/StorageCenter')189 result = self._get_result(r,190 'scSerialNumber',191 ssn)192 if result is None:193 LOG.error(_LE('Failed to find %(s)s. Result %(r)s'),194 {'s': ssn,195 'r': r})196 raise exception.VolumeBackendAPIException(197 _('Failed to find Storage Center'))198 return self._get_id(result)199 # Volume functions200 def _create_folder(self, url, ssn, parent, folder):201 '''This is generic to server and volume folders.202 '''203 f = None204 payload = {}205 payload['Name'] = folder206 payload['StorageCenter'] = ssn207 if parent != '':208 payload['Parent'] = parent209 payload['Notes'] = self.notes210 r = self.client.post(url,211 payload)212 if r.status_code != 201:213 LOG.debug('%(u)s error: %(c)d %(r)s',214 {'u': url,215 'c': r.status_code,216 'r': r.reason})217 else:218 f = self._first_result(r)219 return f220 def _create_folder_path(self, url, ssn, foldername):221 '''This is generic to server and volume folders.222 '''223 path = self._path_to_array(foldername)224 folderpath = ''225 instanceId = ''226 # Technically the first folder is the root so that is already created.227 found = True228 f = None229 for folder in path:230 folderpath = folderpath + folder231 # If the last was found see if this part of the path exists too232 if found:233 listurl = url + '/GetList'234 f = self._find_folder(listurl,235 ssn,236 folderpath)237 if f is None:238 found = False239 # We didn't find it so create it240 if found is False:241 f = self._create_folder(url,242 ssn,243 instanceId,244 folder)245 # If we haven't found a folder or created it then leave246 if f is None:247 LOG.error(_LE('Unable to create folder path %s'),248 folderpath)249 break250 # Next part of the path will need this251 instanceId = self._get_id(f)252 folderpath = folderpath + '/'253 return f254 def _find_folder(self, url, ssn, foldername):255 '''Most of the time the folder will already have been created so256 we look for the end folder and check that the rest of the path is257 right.258 This is generic to server and volume folders.259 '''260 pf = PayloadFilter()261 pf.append('scSerialNumber', ssn)262 basename = os.path.basename(foldername)263 pf.append('Name', basename)264 # If we have any kind of path we add '/' to match the storage265 # center's convention and throw it into the filters.266 folderpath = os.path.dirname(foldername)267 if folderpath != '':268 folderpath += '/'269 pf.append('folderPath', folderpath)270 folder = None271 r = self.client.post(url,272 pf.payload)273 if r.status_code == 200:274 folder = self._get_result(r,275 'folderPath',276 folderpath)277 else:278 LOG.debug('%(u)s error: %(c)d %(r)s',279 {'u': url,280 'c': r.status_code,281 'r': r.reason})282 return folder283 def _create_volume_folder_path(self, ssn, foldername):284 return self._create_folder_path('StorageCenter/ScVolumeFolder',285 ssn,286 foldername)287 def _find_volume_folder(self, ssn, foldername):288 return self._find_folder('StorageCenter/ScVolumeFolder/GetList',289 ssn,290 foldername)291 def _init_volume(self, scvolume):292 '''Maps the volume to a random server and immediately unmaps293 it. This initializes the volume.294 Don't wig out if this fails.295 '''296 pf = PayloadFilter()297 pf.append('scSerialNumber', scvolume.get('scSerialNumber'), 'Equals')298 r = self.client.post('StorageCenter/ScServer/GetList', pf.payload)299 if r.status_code == 200:300 scservers = self._get_json(r)301 # Sort through the servers looking for one with connectivity.302 for scserver in scservers:303 # TODO(tom_swanson): Add check for server type.304 # This needs to be either a physical or virtual server.305 # Outside of tempest tests this should not matter as we only306 # "init" a volume to allow snapshotting of an empty volume.307 if scserver.get('status', '').lower() != 'down':308 # Map to actually create the volume309 self.map_volume(scvolume,310 scserver)311 self.unmap_volume(scvolume,312 scserver)313 break314 def create_volume(self, name, size, ssn, volfolder):315 '''This creates a new volume on the storage center. It316 will create it in volfolder. If volfolder does not317 exist it will create it. If it cannot create volfolder318 the volume will be created in the root.319 '''320 scvolume = None321 # Find our folder322 LOG.debug('Create Volume %(name)s %(ssn)s %(folder)s',323 {'name': name,324 'ssn': ssn,325 'folder': volfolder})326 folder = self._find_volume_folder(ssn,327 volfolder)328 # Doesn't exist? make it329 if folder is None:330 folder = self._create_volume_folder_path(ssn,331 volfolder)332 # If we actually have a place to put our volume create it333 if folder is None:334 LOG.error(_LE('Unable to create folder %s'),335 volfolder)336 # Create the volume337 payload = {}338 payload['Name'] = name339 payload['Notes'] = self.notes340 payload['Size'] = '%d GB' % size341 payload['StorageCenter'] = ssn342 if folder is not None:343 payload['VolumeFolder'] = self._get_id(folder)344 r = self.client.post('StorageCenter/ScVolume',345 payload)346 if r.status_code == 201:347 scvolume = self._get_json(r)348 else:349 LOG.error(_LE('ScVolume create error %(name)s: %(c)d %(r)s'),350 {'name': name,351 'c': r.status_code,352 'r': r.reason})353 if scvolume:354 LOG.info(_LI('Created volume %(instanceId)s: %(name)s'),355 {'instanceId': scvolume['instanceId'],356 'name': scvolume['name']})357 else:358 LOG.error(_LE('ScVolume returned success with empty payload.'359 ' Attempting to locate volume'))360 # In theory it is there since success was returned.361 # Try one last time to find it before returning.362 scvolume = self.find_volume(ssn, name, None)363 return scvolume364 def find_volume(self, ssn, name=None, instanceid=None):365 '''search ssn for volume of name and/or instance id366 '''367 LOG.debug('finding volume %(sn)s : %(name)s : %(id)s',368 {'sn': ssn,369 'name': name,370 'id': instanceid})371 pf = PayloadFilter()372 pf.append('scSerialNumber', ssn)373 # We need at least a name and or an instance id. If we have374 # that we can find a volume.375 if instanceid is not None:376 pf.append('instanceId', instanceid)377 elif name is not None:378 pf.append('Name', name)379 else:380 return None381 r = self.client.post('StorageCenter/ScVolume/GetList',382 pf.payload)383 if r.status_code != 200:384 LOG.debug('ScVolume GetList error %(i)s: %(c)d %(r)s',385 {'i': instanceid,386 'c': r.status_code,387 'r': r.reason})388 return self._first_result(r)389 def delete_volume(self, ssn, name):390 # find our volume391 vol = self.find_volume(ssn, name, None)392 if vol is not None:393 r = self.client.delete('StorageCenter/ScVolume/%s'394 % self._get_id(vol))395 if r.status_code != 200:396 raise exception.VolumeBackendAPIException(397 _('Error deleting volume %(ssn)s: %(sn)s: %(c)d %(r)s') %398 {'ssn': ssn,399 'sn': name,400 'c': r.status_code,401 'r': r.reason})402 # json return should be true or false403 return self._get_json(r)404 LOG.warning(_LW('delete_volume: unable to find volume %s'),405 name)406 # If we can't find the volume then it is effectively gone.407 return True408 def _create_server_folder_path(self, ssn, foldername):409 return self._create_folder_path('StorageCenter/ScServerFolder',410 ssn,411 foldername)412 def _find_server_folder(self, ssn, foldername):413 return self._find_folder('StorageCenter/ScServerFolder/GetList',414 ssn,415 foldername)416 def _add_hba(self, scserver, wwnoriscsiname, isfc=False):417 '''Adds an HBA to the scserver. The HBA will be added418 even if it has not been seen by the storage center.419 '''420 payload = {}421 if isfc is True:422 payload['HbaPortType'] = 'FibreChannel'423 else:424 payload['HbaPortType'] = 'Iscsi'425 payload['WwnOrIscsiName'] = wwnoriscsiname426 payload['AllowManual'] = True427 r = self.client.post('StorageCenter/ScPhysicalServer/%s/AddHba'428 % self._get_id(scserver),429 payload)430 if r.status_code != 200:431 LOG.error(_LE('AddHba error: %(i)s to %(s)s : %(c)d %(r)s'),432 {'i': wwnoriscsiname,433 's': scserver['name'],434 'c': r.status_code,435 'r': r.reason})436 return False437 return True438 # We do not know that we are red hat linux 6.x but that works439 # best for red hat and ubuntu. So, there.440 def _find_serveros(self, ssn, osname='Red Hat Linux 6.x'):441 '''Returns the serveros instance id of the specified osname.442 Required to create a server.443 '''444 pf = PayloadFilter()445 pf.append('scSerialNumber', ssn)446 r = self.client.post('StorageCenter/ScServerOperatingSystem/GetList',447 pf.payload)448 if r.status_code == 200:449 oslist = self._get_json(r)450 for srvos in oslist:451 name = srvos.get('name', 'nope')452 if name.lower() == osname.lower():453 # Found it return the id454 return self._get_id(srvos)455 LOG.warning(_LW('ScServerOperatingSystem GetList return: %(c)d %(r)s'),456 {'c': r.status_code,457 'r': r.reason})458 return None459 def create_server_multiple_hbas(self, ssn, foldername, wwns):460 '''Same as create_server except it can take a list of hbas. hbas461 can be wwns or iqns.462 '''463 # Add hbas464 scserver = None465 # Our instance names466 for wwn in wwns:467 if scserver is None:468 # Use the fist wwn to create the server.469 scserver = self.create_server(ssn,470 foldername,471 wwn,472 True)473 else:474 # Add the wwn to our server475 self._add_hba(scserver,476 wwn,477 True)478 return scserver479 def create_server(self, ssn, foldername, wwnoriscsiname, isfc=False):480 '''creates a server on the the storage center ssn. Adds the first481 HBA to it.482 '''483 scserver = None484 payload = {}485 payload['Name'] = 'Server_' + wwnoriscsiname486 payload['StorageCenter'] = ssn487 payload['Notes'] = self.notes488 # We pick Red Hat Linux 6.x because it supports multipath and489 # will attach luns to paths as they are found.490 scserveros = self._find_serveros(ssn, 'Red Hat Linux 6.x')491 if scserveros is not None:492 payload['OperatingSystem'] = scserveros493 # Find our folder or make it494 folder = self._find_server_folder(ssn,495 foldername)496 if folder is None:497 folder = self._create_server_folder_path(ssn,498 foldername)499 # At this point it doesn't matter if the folder was created or not.500 # We just attempt to create the server. Let it be in the root if501 # the folder creation fails.502 if folder is not None:503 payload['ServerFolder'] = self._get_id(folder)504 # create our server505 r = self.client.post('StorageCenter/ScPhysicalServer',506 payload)507 if r.status_code != 201:508 LOG.error(_LE('ScPhysicalServer create error: %(i)s: %(c)d %(r)s'),509 {'i': wwnoriscsiname,510 'c': r.status_code,511 'r': r.reason})512 else:513 # Server was created514 scserver = self._first_result(r)515 # Add hba to our server516 if scserver is not None:517 if not self._add_hba(scserver,518 wwnoriscsiname,519 isfc):520 LOG.error(_LE('Error adding HBA to server'))521 # Can't have a server without an HBA522 self._delete_server(scserver)523 scserver = None524 # Success or failure is determined by the caller525 return scserver526 def find_server(self, ssn, instance_name):527 '''Hunts for a server by looking for an HBA with the server's IQN528 or wwn.529 If found, the server the HBA is attached to, if any, is returned.530 '''531 scserver = None532 # We search for our server by first finding our HBA533 hba = self._find_serverhba(ssn, instance_name)534 # Once created hbas stay in the system. So it isn't enough535 # that we found one it actually has to be attached to a536 # server.537 if hba is not None and hba.get('server') is not None:538 pf = PayloadFilter()539 pf.append('scSerialNumber', ssn)540 pf.append('instanceId', self._get_id(hba['server']))541 r = self.client.post('StorageCenter/ScServer/GetList',542 pf.payload)543 if r.status_code != 200:544 LOG.error(_LE('ScServer error: %(c)d %(r)s'),545 {'c': r.status_code,546 'r': r.reason})547 else:548 scserver = self._first_result(r)549 if scserver is None:550 LOG.debug('Server (%s) not found.',551 instance_name)552 return scserver553 def _find_serverhba(self, ssn, instance_name):554 '''Hunts for a sc server HBA by looking for an HBA with the555 server's IQN or wwn.556 If found, the sc server HBA is returned.557 '''558 scserverhba = None559 # We search for our server by first finding our HBA560 pf = PayloadFilter()561 pf.append('scSerialNumber', ssn)562 pf.append('instanceName', instance_name)563 r = self.client.post('StorageCenter/ScServerHba/GetList',564 pf.payload)565 if r.status_code != 200:566 LOG.debug('ScServerHba error: %(c)d %(r)s',567 {'c': r.status_code,568 'r': r.reason})569 else:570 scserverhba = self._first_result(r)571 return scserverhba572 def _find_domains(self, cportid):573 r = self.client.get('StorageCenter/ScControllerPort/%s/FaultDomainList'574 % cportid)575 if r.status_code == 200:576 domains = self._get_json(r)577 return domains578 else:579 LOG.debug('FaultDomainList error: %(c)d %(r)s',580 {'c': r.status_code,581 'r': r.reason})582 LOG.error(_LE('Error getting FaultDomainList'))583 return None584 def _find_domain(self, cportid, domainip):585 '''Returns the fault domain which a given controller port can586 be seen by the server587 '''588 domains = self._find_domains(cportid)589 if domains:590 # Wiffle through the domains looking for our591 # configured ip.592 for domain in domains:593 # If this is us we return the port.594 if domain.get('targetIpv4Address',595 domain.get('wellKnownIpAddress')) == domainip:596 return domain597 return None598 def _find_fc_initiators(self, scserver):599 '''_find_fc_initiators600 returns the server's fc HBA's wwns601 '''602 initiators = []603 r = self.client.get('StorageCenter/ScServer/%s/HbaList'604 % self._get_id(scserver))605 if r.status_code == 200:606 hbas = self._get_json(r)607 for hba in hbas:608 wwn = hba.get('instanceName')609 if (hba.get('portType') == 'FibreChannel' and610 wwn is not None):611 initiators.append(wwn)612 else:613 LOG.debug('HbaList error: %(c)d %(r)s',614 {'c': r.status_code,615 'r': r.reason})616 LOG.error(_LE('Unable to find FC intitiators'))617 return initiators618 def get_volume_count(self, scserver):619 r = self.client.get('StorageCenter/ScServer/%s/MappingList'620 % self._get_id(scserver))621 if r.status_code == 200:622 mappings = self._get_json(r)623 return len(mappings)624 # Panic mildly but do not return 0.625 return -1626 def _find_mappings(self, scvolume):627 '''find mappings628 returns the volume's mappings629 '''630 mappings = []631 if scvolume.get('active', False):632 r = self.client.get('StorageCenter/ScVolume/%s/MappingList'633 % self._get_id(scvolume))634 if r.status_code == 200:635 mappings = self._get_json(r)636 else:637 LOG.debug('MappingList error: %(c)d %(r)s',638 {'c': r.status_code,639 'r': r.reason})640 LOG.error(_LE('Unable to find volume mappings: %s'),641 scvolume.get('name'))642 else:643 LOG.error(_LE('_find_mappings: volume is not active'))644 return mappings645 def _find_controller_port(self, cportid):646 '''_find_controller_port647 returns the controller port dict648 '''649 controllerport = None650 r = self.client.get('StorageCenter/ScControllerPort/%s'651 % cportid)652 if r.status_code == 200:653 controllerport = self._first_result(r)654 else:655 LOG.debug('ScControllerPort error: %(c)d %(r)s',656 {'c': r.status_code,657 'r': r.reason})658 LOG.error(_LE('Unable to find controller port: %s'),659 cportid)660 return controllerport661 def find_wwns(self, scvolume, scserver):662 '''returns the lun and wwns of the mapped volume'''663 # Our returnables664 lun = None # our lun. We return the first lun.665 wwns = [] # list of targets666 itmap = {} # dict of initiators and the associated targets667 # Make sure we know our server's initiators. Only return668 # mappings that contain HBA for this server.669 initiators = self._find_fc_initiators(scserver)670 # Get our volume mappings671 mappings = self._find_mappings(scvolume)672 if len(mappings) > 0:673 # We check each of our mappings. We want to return674 # the mapping we have been configured to use.675 for mapping in mappings:676 # Find the controller port for this mapping677 cport = mapping.get('controllerPort')678 controllerport = self._find_controller_port(679 self._get_id(cport))680 if controllerport is not None:681 # This changed case at one point or another.682 # Look for both keys.683 wwn = controllerport.get('wwn',684 controllerport.get('WWN'))685 if wwn is None:686 LOG.error(_LE('Find_wwns: Unable to find port wwn'))687 serverhba = mapping.get('serverHba')688 if wwn is not None and serverhba is not None:689 hbaname = serverhba.get('instanceName')690 if hbaname in initiators:691 if itmap.get(hbaname) is None:692 itmap[hbaname] = []693 itmap[hbaname].append(wwn)694 wwns.append(wwn)695 mappinglun = mapping.get('lun')696 if lun is None:697 lun = mappinglun698 elif lun != mappinglun:699 LOG.warning(_LW('Inconsistent Luns.'))700 else:701 LOG.error(_LE('Find_wwns: Volume appears unmapped'))702 LOG.debug(lun)703 LOG.debug(wwns)704 LOG.debug(itmap)705 # TODO(tom_swanson): if we have nothing to return raise an exception706 # here. We can't do anything with an unmapped volume. We shouldn't707 # pretend we succeeded.708 return lun, wwns, itmap709 def _find_active_controller(self, scvolume):710 LOG.debug('find_active_controller')711 activecontroller = None712 r = self.client.get('StorageCenter/ScVolume/%s/VolumeConfiguration'713 % self._get_id(scvolume))714 if r.status_code == 200:715 volumeconfiguration = self._first_result(r)716 controller = volumeconfiguration.get('controller')717 activecontroller = self._get_id(controller)718 LOG.debug('activecontroller %s', activecontroller)719 return activecontroller720 def find_iscsi_properties(self, scvolume, ip=None, port=None):721 LOG.debug('enter find_iscsi_properties')722 LOG.debug('scvolume: %s', scvolume)723 activeindex = -1724 luns = []725 iqns = []726 portals = []727 access_mode = 'rw'728 mappings = self._find_mappings(scvolume)729 activecontroller = self._find_active_controller(scvolume)730 if len(mappings) > 0:731 for mapping in mappings:732 LOG.debug('mapping: %s', mapping)733 # find the controller port for this mapping734 cport = mapping.get('controllerPort')735 cportid = self._get_id(cport)736 domains = self._find_domains(cportid)737 if domains:738 controllerport = self._find_controller_port(cportid)739 LOG.debug('controllerport: %s', controllerport)740 if controllerport is not None:741 appendproperties = False742 for d in domains:743 LOG.debug('domain: %s', d)744 ipaddress = d.get('targetIpv4Address',745 d.get('wellKnownIpAddress'))746 portnumber = d.get('portNumber')747 if ((ip is None or ip == ipaddress) and748 (port is None or port == portnumber)):749 portal = (ipaddress + ':' +750 six.text_type(portnumber))751 # I'm not sure when we can have more than752 # one portal for a domain but since it is an753 # array being returned it is best to check.754 if portals.count(portal) == 0:755 appendproperties = True756 portals.append(portal)757 else:758 LOG.debug('Domain %s has two portals.',759 self._get_id(d))760 # We do not report lun and iqn info unless it is for761 # the configured port OR the user has not enabled762 # multipath. (In which case ip and port sent in763 # will be None).764 if appendproperties is True:765 iqns.append(controllerport.get('iscsiName'))766 luns.append(mapping.get('lun'))767 if activeindex == -1:768 controller = controllerport.get('controller')769 controllerid = self._get_id(controller)770 if controllerid == activecontroller:771 activeindex = len(iqns) - 1772 if mapping['readOnly'] is True:773 access_mode = 'ro'774 if activeindex == -1:775 LOG.debug('Volume is not yet active on any controller.')776 activeindex = 0777 data = {'target_discovered': False,778 'target_iqns': iqns,779 'target_portals': portals,780 'target_luns': luns,781 'access_mode': access_mode782 }783 LOG.debug('find_iscsi_properties return: %s', data)784 return activeindex, data785 def map_volume(self, scvolume, scserver):786 '''map_volume787 The check for server existence is elsewhere; does not create the788 server.789 '''790 # Make sure we have what we think we have791 serverid = self._get_id(scserver)792 volumeid = self._get_id(scvolume)793 if serverid is not None and volumeid is not None:794 payload = {}795 payload['server'] = serverid796 advanced = {}797 advanced['MapToDownServerHbas'] = True798 payload['Advanced'] = advanced799 r = self.client.post('StorageCenter/ScVolume/%s/MapToServer'800 % volumeid,801 payload)802 if r.status_code == 200:803 # We just return our mapping804 return self._first_result(r)805 # Should not be here.806 LOG.debug('MapToServer error: %(c)d %(r)s',807 {'c': r.status_code,808 'r': r.reason})809 # Error out810 LOG.error(_LE('Unable to map %(vol)s to %(srv)s'),811 {'vol': scvolume['name'],812 'srv': scserver['name']})813 return None814 def unmap_volume(self, scvolume, scserver):815 '''unmap_volume816 deletes all mappings to a server, not just the ones on the path817 defined in cinder.conf.818 '''819 rtn = True820 serverid = self._get_id(scserver)821 volumeid = self._get_id(scvolume)822 if serverid is not None and volumeid is not None:823 r = self.client.get('StorageCenter/ScVolume/%s/MappingProfileList'824 % volumeid)825 if r.status_code == 200:826 profiles = self._get_json(r)827 for profile in profiles:828 prosrv = profile.get('server')829 if prosrv is not None and self._get_id(prosrv) == serverid:830 r = self.client.delete(831 'StorageCenter/ScMappingProfile/%s'832 % self._get_id(profile))833 if (r.status_code != 200 or r.ok is False):834 LOG.debug('ScMappingProfile error: %(c)d %(r)s',835 {'c': r.status_code,836 'r': r.reason})837 LOG.error(_LE('Unable to unmap Volume %s'),838 volumeid)839 # 1 failed unmap is as good as 100.840 # Fail it and leave841 rtn = False842 break843 LOG.debug('Volume %(v)s unmapped from %(s)s',844 {'v': volumeid,845 's': serverid})846 else:847 LOG.debug('MappingProfileList error: %(c)d %(r)s',848 {'c': r.status_code,849 'r': r.reason})850 rtn = False851 return rtn852 def get_storage_usage(self, ssn):853 '''get_storage_usage'''854 storageusage = None855 if ssn is not None:856 r = self.client.get('StorageCenter/StorageCenter/%s/StorageUsage'857 % ssn)858 if r.status_code == 200:859 storageusage = self._get_json(r)860 else:861 LOG.debug('StorageUsage error: %(c)d %(r)s',862 {'c': r.status_code,863 'r': r.reason})864 return storageusage865 def create_replay(self, scvolume, replayid, expire):866 '''create_replay867 expire is in minutes.868 one could snap a volume before it has been activated, so activate869 by mapping and unmapping to a random server and let them. This870 should be a fail but the Tempest tests require it.871 '''872 replay = None873 if scvolume is not None:874 if (scvolume.get('active') is not True or875 scvolume.get('replayAllowed') is not True):876 self._init_volume(scvolume)877 payload = {}878 payload['description'] = replayid879 payload['expireTime'] = expire880 r = self.client.post('StorageCenter/ScVolume/%s/CreateReplay'881 % self._get_id(scvolume),882 payload)883 if r.status_code != 200:884 LOG.debug('CreateReplay error: %(c)d %(r)s',885 {'c': r.status_code,886 'r': r.reason})887 LOG.error(_LE('Error creating replay.'))888 else:889 replay = self._first_result(r)890 return replay891 def find_replay(self, scvolume, replayid):892 '''find_replay893 searches for the replay by replayid which we store in the894 replay's description attribute895 '''896 replay = None897 r = self.client.get('StorageCenter/ScVolume/%s/ReplayList'898 % self._get_id(scvolume))899 try:900 content = self._get_json(r)901 # This will be a list. If it isn't bail902 if isinstance(content, list):903 for r in content:904 # The only place to save our information with the public905 # api is the description field which isn't quite long906 # enough. So we check that our description is pretty much907 # the max length and we compare that to the start of908 # the snapshot id.909 description = r.get('description')910 if (len(description) >= 30 and911 replayid.startswith(description) is True and912 r.get('markedForExpiration') is not True):913 replay = r914 break915 except Exception:916 LOG.error(_LE('Invalid ReplayList return: %s'),917 r)918 if replay is None:919 LOG.debug('Unable to find snapshot %s',920 replayid)921 return replay922 def delete_replay(self, scvolume, replayid):923 '''delete_replay924 hunts down a replay by replayid string and expires it.925 once marked for expiration we do not return the replay as926 a snapshot.927 '''928 LOG.debug('Expiring replay %s', replayid)929 replay = self.find_replay(scvolume,930 replayid)931 if replay is not None:932 r = self.client.post('StorageCenter/ScReplay/%s/Expire'933 % self._get_id(replay),934 {})935 if r.status_code != 204:936 LOG.debug('ScReplay Expire error: %(c)d %(r)s',937 {'c': r.status_code,938 'r': r.reason})939 return False940 # We either couldn't find it or expired it.941 return True942 def create_view_volume(self, volname, volfolder, screplay):943 '''create_view_volume944 creates a new volume named volname in the folder945 volfolder from the screplay.946 '''947 # find our ssn and get our folder948 ssn = screplay.get('scSerialNumber')949 folder = self._find_volume_folder(ssn,950 volfolder)951 # Doesn't exist? make it952 if folder is None:953 folder = self._create_volume_folder_path(ssn,954 volfolder)955 # payload is just the volume name and folder if we have one.956 payload = {}957 payload['Name'] = volname958 payload['Notes'] = self.notes959 if folder is not None:960 payload['VolumeFolder'] = self._get_id(folder)961 r = self.client.post('StorageCenter/ScReplay/%s/CreateView'962 % self._get_id(screplay),963 payload)964 volume = None965 if r.status_code == 200:966 volume = self._first_result(r)967 else:968 LOG.debug('ScReplay CreateView error: %(c)d %(r)s',969 {'c': r.status_code,970 'r': r.reason})971 if volume is None:972 LOG.error(_LE('Unable to create volume %s from replay'),973 volname)974 return volume975 def create_cloned_volume(self, volumename, volumefolder, scvolume):976 '''create_cloned_volume977 creates a temporary replay and then creates a978 view volume from that.979 '''980 clone = None981 replay = self.create_replay(scvolume,982 'Cinder Clone Replay',983 60)984 if replay is not None:985 clone = self.create_view_volume(volumename,986 volumefolder,987 replay)988 else:989 LOG.error(_LE('Error: unable to snap replay'))990 return clone991 def expand_volume(self, scvolume, newsize):992 '''expand_volume'''993 payload = {}994 payload['NewSize'] = '%d GB' % newsize995 r = self.client.post('StorageCenter/ScVolume/%s/ExpandToSize'996 % self._get_id(scvolume),997 payload)998 vol = None999 if r.status_code == 200:1000 vol = self._get_json(r)1001 else:1002 LOG.error(_LE('Error expanding volume %(n)s: %(c)d %(r)s'),1003 {'n': scvolume['name'],1004 'c': r.status_code,1005 'r': r.reason})1006 if vol is not None:1007 LOG.debug('Volume expanded: %(i)s %(s)s',1008 {'i': vol['instanceId'],1009 's': vol['configuredSize']})1010 return vol1011 def _delete_server(self, scserver):1012 '''_delete_server1013 Just give it a shot. If it fails it doesn't matter to cinder.1014 '''1015 if scserver.get('deleteAllowed') is True:1016 r = self.client.delete('StorageCenter/ScServer/%s'1017 % self._get_id(scserver))1018 LOG.debug('ScServer %(i)s delete return: %(c)d %(r)s',1019 {'i': self._get_id(scserver),1020 'c': r.status_code,1021 'r': r.reason})1022 else:...

Full Screen

Full Screen

table.py

Source:table.py Github

copy

Full Screen

...21 'id': 'new_col_id',22 'renamable': True,23 'type': 'numeric'}24INDEX_STYLE = {'textAlign': 'left', 'fontWeight': 'bold'}25def _get_id(subcomponent_id, aio_id):26 return {27 'component': 'CustomDataTable',28 'subcomponent': subcomponent_id,29 'aio_id': aio_id30 }31def _init_datatable_values(dataframe: pd.DataFrame, index_col: List[str]):32 """Initialize main Dash.DataTable parameters33 :param dataframe: Dataframe containing the values to generate the table from. The index are not34 significant. However the columns are.35 :param index_col:36 :return: List[Dict], List[Dict], int37 """38 # generate mapping between columns name and unique id39 columns = [{"name": col, "id": str(uuid4()), **DEFAULT_COLUMNS_OPTIONS} if col not in index_col40 else {"name": col, "id": col, **INDEX_COLUMNS_OPTIONS}41 for col in dataframe.columns]42 columns_count = len(columns) - 143 datas = dataframe.copy()44 datas['id'] = np.arange(datas.shape[0])45 datas = datas. \46 rename(columns={value['name']: value['id'] for value in columns}). \47 to_dict('records')48 # append the last column49 if all([col != NEW_COLUMNS_OPTIONS['id'] for col in columns]):50 columns += [NEW_COLUMNS_OPTIONS]51 for item in datas:52 item[NEW_COLUMNS_OPTIONS['id']] = ''53 return datas, columns, columns_count54def _as_dataframe(records: List[Dict]) -> pd.DataFrame:55 """Transform records into dataframe. Also takes care of None and nan values.56 :param records: Expects a List of dictionary with at least 'id' as key.57 :return: DataFrame with id column as index58 """59 return pd.DataFrame(records). \60 replace([None], np.nan). \61 replace(np.nan, ''). \62 set_index('id')63def _to_records(dataframe: pd.DataFrame) -> List[Dict]:64 """Transform dataframe into records. Replace empty strings by Nan values65 :param dataframe: Expects a dataframe with 'id' as index.66 :return: List of dictionaries67 """68 return dataframe. \69 replace('', np.nan). \70 reset_index(). \71 to_dict('records')72def _get_visible_records(dataframe: pd.DataFrame, is_hidden: bool, columns: List[Dict]) -> \73 List[Dict]:74 """75 :param dataframe: Expects empty strings when76 :param is_hidden: When True, return only77 :param columns: List of dictionaries. Expects 'type' as key holding either 'text' or78 'numeric' values.79 :return:80 """81 if not is_hidden:82 return _to_records(dataframe)83 else:84 # get rows that are not empty (-1 because of label in first column)85 len_index_col = sum([col['type'] == 'text' for col in columns])86 mask = (dataframe != '').sum(axis=1) - len_index_col != 087 return _to_records(dataframe[mask])88def _has_last_column_values_changed(data: pd.DataFrame, data_previous: pd.DataFrame) -> bool:89 change = data != data_previous90 changed_column = data.columns[change.any(axis=0)]91 return NEW_COLUMNS_OPTIONS['id'] in changed_column92def _is_last_column_renamed(columns: List[Dict]) -> bool:93 return columns[-1]['name'] != NEW_COLUMNS_OPTIONS['name']94def _sync_change_with_data_reference(datas: pd.DataFrame, datas_reference: pd.DataFrame) -> \95 Tuple[pd.DataFrame, pd.DataFrame]:96 """Synchronize data_reference with datas. Reindex datas_reference columns with datas columns in97 case columns deletion. Update datas_reference values with datas values on common position."""98 datas_reference = datas_reference.reindex(columns=datas.columns)99 datas_reference.loc[datas.index, datas.columns] = datas100 return datas, datas_reference101def _get_records(datas_reference: pd.DataFrame, is_hidden: bool, columns: List[Dict]) -> \102 Tuple[List[Dict], List[Dict]]:103 """Returns datas_reference as records and datas as visible records."""104 data = _get_visible_records(datas_reference, is_hidden, columns)105 datas_reference = _to_records(datas_reference)106 return data, datas_reference107def _append_in_last_column(data_reference: pd.DataFrame, columns_count: int,108 columns: List[Dict]):109 columns_count += 1110 new_column_name = f'Col {columns_count}'111 new_id = str(uuid4())112 columns += [{'id': new_id, 'name': new_column_name, **DEFAULT_COLUMNS_OPTIONS}]113 # swap list index in order to always have the last column available114 columns[-1], columns[-2] = columns[-2], columns[-1]115 data_reference[new_id] = data_reference[NEW_COLUMNS_OPTIONS['id']]116 data_reference[NEW_COLUMNS_OPTIONS['id']] = ''117 return columns_count, columns, data_reference118def _handle_last_column_naming(data_reference, columns):119 new_id = str(uuid4())120 columns[-1]['id'] = new_id121 columns.append(NEW_COLUMNS_OPTIONS)122 data_reference[new_id] = ''123 return data_reference, columns124def get_table_from_dataframe(app: 'Dash', dataframe, index_col: List[str], id, table_kwargs):125 """126 :param app:127 :param dataframe:128 :param index_col:129 :param id:130 :param table_kwargs:131 :return:132 """133 datas, columns, columns_count = _init_datatable_values(dataframe, index_col)134 style_header_conditional = [{'if': {'column_id': NEW_COLUMNS_OPTIONS['id']},135 'fontStyle': 'italic', 'color': 'rgb(128, 128, 128)',136 'backgroundColor': 'rgba(200, 200, 200, 0.1'}]137 style_header_conditional += [{'if': {'column_id': col}, **INDEX_STYLE} for col in index_col]138 style_data_conditional = [{'if': {'column_id': NEW_COLUMNS_OPTIONS['id']},139 'fontStyle': 'italic', 'width': 'auto',140 'color': 'rgb(128, 128, 128)',141 'backgroundColor': 'rgba(200, 200, 200, 0.1'}]142 style_data_conditional += [{'if': {'column_id': col}, **INDEX_STYLE} for col in index_col]143 tooltip = {item['id']: {'value': item['name'], 'use_with': 'header'} for item in columns}144 table = DataTable(data=datas, columns=columns,145 tooltip=tooltip,146 id=_get_id('table', id),147 row_selectable=False,148 row_deletable=False,149 fixed_columns={'headers': True, 'data': len(index_col)},150 style_as_list_view=True,151 style_table={'overflowX': 'auto', 'minWidth': '100%'},152 style_data={'width': '5em', },153 style_header={'width': '5em'},154 style_header_conditional=style_header_conditional,155 style_data_conditional=style_data_conditional,156 **table_kwargs)157 table = dbc.Col(table, className='table-container')158 sidebar = [159 dbc.Button(id=_get_id('open-button', id), children='\u2630', className='side-bar-item'),160 dbc.Checkbox(id=_get_id('hide-rows-checkbox', id), value=False, label='Hide',161 className='side-bar-item')]162 sidebar = html.Div(sidebar, className='side-bar', id=_get_id('side-bar', id))163 stores = [dcc.Store(id=_get_id('side-bar-status-store', id), data=False),164 dcc.Store(id=_get_id('columns-mapping-store', id), data=columns),165 dcc.Store(id=_get_id('columns-count-store', id), data=columns_count),166 dcc.Store(id=_get_id('table-data-store', id), data=datas)]167 layout = html.Div([table, sidebar, *stores], className='customdatatable-container')168 @app.callback(Output(_get_id('table', id), 'data'),169 Output(_get_id('columns-count-store', id), 'data'),170 Output(_get_id('table', id), 'columns'),171 Output(_get_id('table-data-store', id), 'data'),172 Input(_get_id('table', id), 'data'),173 Input(_get_id('table', id), 'columns'),174 Input(_get_id('hide-rows-checkbox', id), 'value'),175 State(_get_id('table', id), 'data_previous'),176 State(_get_id('table-data-store', id), 'data'),177 State(_get_id('columns-count-store', id), 'data'))178 def on_data_change(data, columns, is_hidden, data_previous, data_reference, columns_count):179 # On change of the data of the table180 context = get_context()181 if context == (_get_id('table', id), 'data') and data_previous is not None:182 data = _as_dataframe(data)183 data_previous = _as_dataframe(data_previous)184 data_reference = _as_dataframe(data_reference)185 data, data_reference = _sync_change_with_data_reference(data, data_reference)186 # If change was in the last column, append a new column and keep the new column at the187 # end of the table188 if _has_last_column_values_changed(data, data_previous):189 columns_count, columns, data_reference = \190 _append_in_last_column(data_reference, columns_count, columns)191 data, data_reference = _get_records(data_reference, is_hidden, columns)192 else:193 data, data_reference = _get_records(data_reference, is_hidden, columns)194 columns_count, columns = dash.no_update, dash.no_update195 # keep the table, and data store synchronized196 return data, columns_count, columns, data_reference197 # On change of the columns198 elif context == (_get_id('table', id), 'columns'):199 # if the change was in the last column, change its id and append it200 data = _as_dataframe(data)201 data_reference = _as_dataframe(data_reference)202 data, data_reference = _sync_change_with_data_reference(data, data_reference)203 if _is_last_column_renamed(columns):204 data_reference, columns = _handle_last_column_naming(data_reference, columns)205 data, data_reference = _get_records(data_reference, is_hidden, columns)206 return data, dash.no_update, columns, data_reference207 return dash.no_update, dash.no_update, dash.no_update, _to_records(data_reference)208 elif context == (_get_id('hide-rows-checkbox', id), 'value'):209 data_reference = _as_dataframe(data_reference)210 data, data_reference = _get_records(data_reference, is_hidden, columns)211 return data, dash.no_update, dash.no_update, dash.no_update212 else:213 raise PreventUpdate214 app.clientside_callback(ClientsideFunction('clientside', 'open_sidebar'),215 Output(_get_id('side-bar', id), 'className'),216 Output(_get_id('side-bar-status-store', id), 'data'),217 Output(_get_id('open-button', id), 'children'),218 Input(_get_id('open-button', id), 'n_clicks'),219 State(_get_id('side-bar-status-store', id), 'data')220 )221 app.clientside_callback(ClientsideFunction('clientside', 'synchronize_columns_mapping'),222 Output(_get_id('columns-mapping-store', id), 'data'),223 Input(_get_id('table', id), 'columns')224 )225 app.clientside_callback(ClientsideFunction('clientside', 'synchronize_tooltip'),226 Output(_get_id('table', id), 'tooltip'),227 Input(_get_id('table', id), 'columns')228 )...

Full Screen

Full Screen

Coindelta.py

Source:Coindelta.py Github

copy

Full Screen

1import requests2import threading3import bin.Helpers as helper4"""5This class is responsible to retrieve the Best Bid (INR) data from Coindelta 6"""7class Coindelta(threading.Thread):8 # Constants9 _API_URL = "https://coindelta.com/api/v1/public/getticker/"10 _SUFFIX = "-inr"11 _LOOK_TAG = "MarketName"12 _GET_TAG = "Bid"13 def __init__(self, _this):14 threading.Thread.__init__(self)15 self._calling_class = _this16 self._get_id = ""17 return18 def run(self):19 if self._get_id != "":20 try:21 r = requests.get(self._API_URL)22 value = self._parse(r.text)23 self._calling_class.newY(value, self._get_id.upper())24 except Exception:25 self._calling_class.error("CONNECTION ERROR")26 else:27 self._calling_class.error("CURRENCY NOT SELECTED")28 return29 def set_id(self, _id):30 self._get_id = _id31 return32 def _parse(self, text):33 data = helper.loadJson(text=text)34 look_for = self._get_id.lower() + "" + self._SUFFIX35 for item in data:36 if item[self._LOOK_TAG] == look_for:37 return item[self._GET_TAG]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run stestr automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful