Best Python code snippet using pyresttest_python
storcli.py
Source:storcli.py  
...44        pass45    print_all_metrics(metric_list)46def handle_sas_controller(response):47    (controller_index, baselabel) = get_basic_controller_info(response)48    add_metric('healthy', baselabel, int(response['Status']['Controller Status'] == 'OK'))49    add_metric('ports', baselabel, response['HwCfg']['Backend Port Count'])50    try:51        # The number of physical disks is half of the number of items in this dict52        # Every disk is listed twice - once for basic info, again for detailed info53        add_metric('physical_drives', baselabel,54                   len(response['Physical Device Information'].keys()) / 2)55    except AttributeError:56        pass57    # Split up string to not trigger CodeSpell issues58    add_metric('temperature', baselabel,59               int(response['HwCfg']['ROC temperature(Degree Celc' + 'ius)']))60    for key, basic_disk_info in response['Physical Device Information'].items():61        if 'Detailed Information' in key:62            continue63        create_metrcis_of_physical_drive(basic_disk_info[0],64                                         response['Physical Device Information'], controller_index)65def handle_megaraid_controller(response):66    (controller_index, baselabel) = get_basic_controller_info(response)67    # BBU Status Optimal value is 0 for cachevault and 32 for BBU68    add_metric('battery_backup_healthy', baselabel,69               int(response['Status']['BBU Status'] in [0, 32]))70    add_metric('degraded', baselabel, int(response['Status']['Controller Status'] == 'Degraded'))71    add_metric('failed', baselabel, int(response['Status']['Controller Status'] == 'Failed'))72    add_metric('healthy', baselabel, int(response['Status']['Controller Status'] == 'Optimal'))73    add_metric('drive_groups', baselabel, response['Drive Groups'])74    add_metric('virtual_drives', baselabel, response['Virtual Drives'])75    add_metric('physical_drives', baselabel, response['Physical Drives'])76    add_metric('ports', baselabel, response['HwCfg']['Backend Port Count'])77    add_metric('scheduled_patrol_read', baselabel,78               int('hrs' in response['Scheduled Tasks']['Patrol Read Reoccurrence']))79    add_metric('temperature', baselabel, int(response['HwCfg']['ROC temperature(Degree Celsius)']))80    for cvidx, cvinfo in enumerate(response['Cachevault_Info']):81        add_metric('cv_temperature', baselabel + ',cvidx="' + str(cvidx) + '"', int(cvinfo['Temp'].replace('C','')))82    time_difference_seconds = -183    system_time = datetime.strptime(response['Basics'].get('Current System Date/time'),84                                    "%m/%d/%Y, %H:%M:%S")85    controller_time = datetime.strptime(response['Basics'].get('Current Controller Date/Time'),86                                        "%m/%d/%Y, %H:%M:%S")87    if system_time and controller_time:88        time_difference_seconds = abs(system_time - controller_time).seconds89        add_metric('time_difference', baselabel, time_difference_seconds)90    for virtual_drive in response['VD LIST']:91        vd_position = virtual_drive.get('DG/VD')92        drive_group, volume_group = -1, -193        if vd_position:94            drive_group = vd_position.split('/')[0]95            volume_group = vd_position.split('/')[1]96        vd_baselabel = 'controller="{}",DG="{}",VG="{}"'.format(controller_index, drive_group,97                                                                volume_group)98        vd_info_label = vd_baselabel + ',name="{}",cache="{}",type="{}",state="{}"'.format(99            str(virtual_drive.get('Name')).strip(),100            str(virtual_drive.get('Cache')).strip(),101            str(virtual_drive.get('TYPE')).strip(),102            str(virtual_drive.get('State')).strip())103        add_metric('vd_info', vd_info_label, 1)104    if response['Physical Drives'] > 0:105        data = get_storcli_json('/cALL/eALL/sALL show all J')106        drive_info = data['Controllers'][controller_index]['Response Data']107    for physical_drive in response['PD LIST']:108        create_metrcis_of_physical_drive(physical_drive, drive_info, controller_index)109def get_basic_controller_info(response):110    controller_index = response['Basics']['Controller']111    baselabel = 'controller="{}"'.format(controller_index)112    controller_info_label = baselabel + ',model="{}",serial="{}",fwversion="{}"'.format(113        str(response['Basics']['Model']).strip(),114        str(response['Basics']['Serial Number']).strip(),115        str(response['Version']['Firmware Version']).strip(),116    )117    add_metric('controller_info', controller_info_label, 1)118    return (controller_index, baselabel)119def create_metrcis_of_physical_drive(physical_drive, detailed_info_array, controller_index):120    enclosure = physical_drive.get('EID:Slt').split(':')[0]121    slot = physical_drive.get('EID:Slt').split(':')[1]122    pd_baselabel = 'controller="{}",enclosure="{}",slot="{}"'.format(controller_index, enclosure,123                                                                     slot)124    pd_info_label = pd_baselabel + \125        ',disk_id="{}",interface="{}",media="{}",model="{}",DG="{}",state="{}"'.format(126            str(physical_drive.get('DID')).strip(),127            str(physical_drive.get('Intf')).strip(),128            str(physical_drive.get('Med')).strip(),129            str(physical_drive.get('Model')).strip(),130            str(physical_drive.get('DG')).strip(),131            str(physical_drive.get('State')).strip())132    drive_identifier = 'Drive /c' + str(controller_index) + '/e' + str(enclosure) + '/s' + str(133        slot)134    if enclosure == ' ':135        drive_identifier = 'Drive /c' + str(controller_index) + '/s' + str(slot)136    try:137        info = detailed_info_array[drive_identifier + ' - Detailed Information']138        state = info[drive_identifier + ' State']139        attributes = info[drive_identifier + ' Device attributes']140        settings = info[drive_identifier + ' Policies/Settings']141        add_metric('pd_shield_counter', pd_baselabel, state['Shield Counter'])142        add_metric('pd_media_errors', pd_baselabel, state['Media Error Count'])143        add_metric('pd_other_errors', pd_baselabel, state['Other Error Count'])144        add_metric('pd_predictive_errors', pd_baselabel, state['Predictive Failure Count'])145        add_metric('pd_smart_alerted', pd_baselabel,146                   int(state['S.M.A.R.T alert flagged by drive'] == 'Yes'))147        add_metric('pd_link_speed_gbps', pd_baselabel, attributes['Link Speed'].split('.')[0])148        add_metric('pd_device_speed_gbps', pd_baselabel, attributes['Device Speed'].split('.')[0])149        add_metric('pd_commissioned_spare', pd_baselabel,150                   int(settings['Commissioned Spare'] == 'Yes'))151        add_metric('pd_emergency_spare', pd_baselabel, int(settings['Emergency Spare'] == 'Yes'))152        pd_info_label += ',firmware="{}"'.format(attributes['Firmware Revision'].strip())153    except KeyError:154        pass155    add_metric('pd_info', pd_info_label, 1)156def add_metric(name, labels, value):157    global metric_list158    try:159        metric_list[name].append({160            'labels': labels,161            'value': float(value),162        })163    except ValueError:164        pass165def print_all_metrics(metrics):166    for metric, measurements in metrics.items():167        print('# HELP {}{} MegaRAID {}'.format(metric_prefix, metric, metric.replace('_', ' ')))168        print('# TYPE {}{} gauge'.format(metric_prefix, metric))169        for measurement in measurements:170            if measurement['value'] != 'Unknown':...metrics.py
Source:metrics.py  
...15    This is the metric class that is used as a data structure as well as for16    the formatting and sending of cloudwatch metrics.17    """18    _metrics = {}19    def add_metric(self, key, value, unit='Percent'):20        """This will get the format the metrics correctly for cloudwatch."""21        if unit not in self._metrics:22            self._metrics[unit] = {}23        self._metrics[unit].update({key: value})24    def get_auto_scaling_group_name(self, instance_id, region):25        """This will get the autoscaling data to send to cloudwatch."""26        autoscale = boto3.client('autoscaling', region_name=region)27        response = autoscale.describe_auto_scaling_instances(28            InstanceIds=[29                instance_id,30            ]31        )32        if len(response) > 0:33            for key in response['AutoScalingInstances']:34                return key['AutoScalingGroupName']35    def send(self):36        """This will send the data to cloudwatch."""37        metadata = get_instance_metadata()38        instance_id = metadata['instance-id']39        region = metadata['placement']['availability-zone'][0:-1]40        cloudwatch = boto3.client('cloudwatch', region_name=region)41        group = self.get_auto_scaling_group_name(instance_id, region)42        for (unit, metrics) in self._metrics.items():43            for key in metrics.keys():44                response = cloudwatch.put_metric_data(45                    Namespace='EC2',46                    MetricData=[47                        {48                            'MetricName': key,49                            'Dimensions': [50                                {51                                    'Name': 'InstanceId',52                                    'Value': instance_id53                                },54                            ],55                            'Value': metrics[key],56                            'Unit': unit57                        },58                    ]59                )60                if group:61                    response = cloudwatch.put_metric_data(62                        Namespace='EC2',63                        MetricData=[64                            {65                                'MetricName': key,66                                'Dimensions': [67                                    {68                                        'Name': 'AutoScalingGroup',69                                        'Value': group70                                    },71                                ],72                                'Value': metrics[key],73                                'Unit': unit74                            },75                        ]76                    )77def disk_partitions(disk_ntuple, all=False):78    """Return all mountd partitions as a named tuple.79    If all == False return physical partitions only.80    """81    phydevs = []82    if os.path.exists('/proc/filesystems'):83        my_file = open('/proc/filesystems', 'r')84        for line in my_file:85            if not line.startswith('nodev'):86                phydevs.append(line.strip())87    else:88        print ('path does not exist: /proc/filesystems')89    retlist = []90    if os.path.exists('/etc/mtab'):91        my_file = open('/etc/mtab', 'r')92        for line in my_file:93            if not all and line.startswith('none'):94                continue95            fields = line.split()96            device = fields[0]97            mountpoint = fields[1]98            fstype = fields[2]99            if not all and fstype not in phydevs:100                continue101            if device == 'none':102                device = ''103            ntuple = disk_ntuple(device, mountpoint, fstype)104            retlist.append(ntuple)105    else:106        print ('path does not exist: /etc/mtab')107    return retlist108def disk_usage(path, usage_ntuple):109    """Return disk usage associated with path."""110    my_fs_stats = os.statvfs(path)111    free = (my_fs_stats.f_bavail * my_fs_stats.f_frsize)112    total = (my_fs_stats.f_blocks * my_fs_stats.f_frsize)113    used = (my_fs_stats.f_blocks - my_fs_stats.f_bfree) * my_fs_stats.f_frsize114    try:115        percent = (float(used) / total) * 100116    except ZeroDivisionError:117        percent = 0118    # NB: the percentage is -5% than what shown by df due to119    # reserved blocks that we are currently not considering:120    # http://goo.gl/sWGbH121    return usage_ntuple(total, used, free, round(percent, 1))122def get_process_state(process_name):123    for process in psutil.pids():124        p = psutil.Process(process)125        if p.name() == process_name:126            print (process_name + ":" + p.status())127            return p.status()128    print process_name + ":missing"129    return "missing"130def get_mount_usage(mount_name):131    if os.path.exists(mount_name):132        # print (mount_name + ":", psutil.disk_usage(mount_name))133        usage = psutil.disk_usage(mount_name).percent134        print (mount_name + ": %.2f" % usage)135        return "%.2f" % usage136    else:137        print (mount_name + ": missing")138        return 0.00139def get_metrics():140    """This function will gather the main components to send to cloudwatch."""141    disk_ntuple = namedtuple('partition', 'device mountpoint fstype')142    usage_ntuple = namedtuple('usage', 'total used free percent')143    # This is due to cron not having access to the normal environment variables.144    os.environ['HTTP_PROXY'] = 'http://10.192.116.73:8080/'145    os.environ['HTTPS_PROXY'] = 'http://10.192.116.73:8080/'146    os.environ['NO_PROXY'] = '169.254.169.254,127.0.0.1,localhost'147    cpu_load_1_sec = psutil.cpu_percent(interval=1)148    cpu_load_5_sec = psutil.cpu_percent(interval=5)149    cpu_load_15_sec = psutil.cpu_percent(interval=15)150    cpu_load_60_sec = psutil.cpu_percent(interval=60)151    mem_total = psutil.virtual_memory().total / 1024 / 1024152    mem_used = psutil.virtual_memory().used / 1024 / 1024153    mem_utilized = psutil.virtual_memory().percent154    swap_total = psutil.swap_memory().total / 1024 / 1024155    swap_used = psutil.swap_memory().used / 1024 / 1024156    swap_utilized = psutil.swap_memory().percent157    print ("CPU1s  : %s" % cpu_load_1_sec)158    print ("CPU5s  : %s" % cpu_load_5_sec)159    print ("CPU15s : %s" % cpu_load_15_sec)160    print ("CPU60s : %s" % cpu_load_60_sec)161    print ("Memory_total: %s" % mem_total)162    print ("Memory_used: %s" % mem_used)163    print ("Memory_utilized: %s" % mem_utilized)164    print ("Swap_total: %s" % swap_total)165    print ("Swap_used: %s" % swap_used)166    print ("Swap_utilized: %s" % swap_utilized)167    my_metrics = Metric()168    my_metrics.add_metric('CPU_1s', cpu_load_1_sec)169    my_metrics.add_metric('CPU_5s', cpu_load_5_sec)170    my_metrics.add_metric('CPU_15s', cpu_load_15_sec)171    my_metrics.add_metric('CPU_60s', cpu_load_60_sec)172    my_metrics.add_metric('Memory_total', mem_total)173    my_metrics.add_metric('Memory_user', mem_used)174    my_metrics.add_metric('Memory_utilized', mem_utilized)175    my_metrics.add_metric('Swap_total', swap_total)176    my_metrics.add_metric('Swap_used', swap_used)177    my_metrics.add_metric('Swap_utilized', swap_utilized)178    my_metrics.add_metric('Mount_root_total', psutil.disk_usage("/").total)179    my_metrics.add_metric('Mount_root_available', psutil.disk_usage("/").free)180    my_metrics.add_metric('Mount_root_used', psutil.disk_usage("/").used)181    my_metrics.add_metric('Mount_root_utilized', get_mount_usage("/"))182    my_metrics.add_metric('Mount_/boot', get_mount_usage("/boot"))183    my_metrics.add_metric('Mount_/home', get_mount_usage("/home"))184    my_metrics.add_metric('Mount_/opt', get_mount_usage("/opt"))185    my_metrics.add_metric('Mount_/usr/gems', get_mount_usage("/usr/gems"))186    my_metrics.add_metric('Mount_/usr/openv', get_mount_usage("/usr/openv"))187    my_metrics.add_metric('Mount_/var', get_mount_usage("/var"))188    my_metrics.add_metric('Mount_/dev', get_mount_usage("/dev"))189    my_metrics.add_metric('Mount_/net', get_mount_usage("/net"))190    my_metrics.add_metric('Process_ntpd', get_process_state("ntpd"))191    my_metrics.add_metric('Process_sshd', get_process_state("sshd"))192    my_metrics.add_metric('Process_crond', get_process_state("crond"))193    my_metrics.add_metric('Process_syslogd', get_process_state("syslogd"))194    my_metrics.add_metric('Process_xinetd', get_process_state("xinetd"))195    for part in disk_partitions(disk_ntuple):196        mountpoint = part.mountpoint197        metrics_key = 'DiskUsage_' + mountpoint198        my_metrics.add_metric(metrics_key,199                              disk_usage(mountpoint, usage_ntuple).percent)200        my_metrics.send()201if __name__ == '__main__':...jetson_stats_prometheus_collector.py
Source:jetson_stats_prometheus_collector.py  
...42            #43            # Board info 44            #45            i = InfoMetricFamily('jetson_info_board', 'Board sys info', labels=['board_info'])46            i.add_metric(['info'], {47                'machine': self._jetson.board['info']['machine'], 48                'jetpack': self._jetson.board['info']['jetpack'], 49                'l4t': self._jetson.board['info']['L4T']50                })51            yield i52            i = InfoMetricFamily('jetson_info_hardware', 'Board hardware info', labels=['board_hw'])                    53            i.add_metric(['hardware'], {54                'type': self._jetson.board['hardware']['TYPE'],55                'codename': self._jetson.board['hardware']['CODENAME'],56                'soc': self._jetson.board['hardware']['SOC'],57                'chip_id': self._jetson.board['hardware']['CHIP_ID'],58                'module': self._jetson.board['hardware']['MODULE'],59                'board': self._jetson.board['hardware']['BOARD'],60                'cuda_arch_bin': self._jetson.board['hardware']['CUDA_ARCH_BIN'],61                'serial_number': self._jetson.board['hardware']['SERIAL_NUMBER']62                })63            yield i64            #65            # NV power mode66            #67            i = InfoMetricFamily('jetson_nvpmode', 'NV power mode', labels=['nvpmode'])68            i.add_metric(['mode'], {'mode': self._jetson.nvpmodel.name})69            yield i70            #71            # System uptime 72            #73            g = GaugeMetricFamily('jetson_uptime', 'System uptime', labels=['uptime'])74            days = self._jetson.uptime.days75            seconds = self._jetson.uptime.seconds76            hours = seconds//360077            minutes = (seconds//60) % 6078            g.add_metric(['days'], days)79            g.add_metric(['hours'], hours)80            g.add_metric(['minutes'], minutes)81            yield g82            #83            # CPU usage 84            #85            g = GaugeMetricFamily('jetson_usage_cpu', 'CPU % schedutil', labels=['cpu'])86            g.add_metric(['cpu_1'], self._jetson.cpu['CPU1']['val'])87            g.add_metric(['cpu_2'], self._jetson.cpu['CPU2']['val'])88            g.add_metric(['cpu_3'], self._jetson.cpu['CPU3']['val'])89            g.add_metric(['cpu_4'], self._jetson.cpu['CPU4']['val'])90            g.add_metric(['cpu_5'], self._jetson.cpu['CPU5']['val'])91            g.add_metric(['cpu_6'], self._jetson.cpu['CPU6']['val'])92            g.add_metric(['cpu_7'], self._jetson.cpu['CPU7']['val'])93            g.add_metric(['cpu_8'], self._jetson.cpu['CPU8']['val'])94            yield g95            # 96            # GPU usage97            #98            g = GaugeMetricFamily('jetson_usage_gpu', 'GPU % schedutil', labels=['gpu'])99            g.add_metric(['val'], self._jetson.gpu['val'])100            # g.add_metric(['frq'], self._jetson.gpu['frq'])101            # g.add_metric(['min_freq'], self._jetson.gpu['min_freq'])102            # g.add_metric(['max_freq'], self._jetson.gpu['max_freq'])103            yield g104            # 105            # RAM usage106            #107            g = GaugeMetricFamily('jetson_usage_ram', 'Memory usage', labels=['ram'])108            g.add_metric(['used'], self._jetson.ram['use'])109            g.add_metric(['shared'], self._jetson.ram['shared'])110            # g.add_metric(['total'], self._jetson.ram['tot'])111            # g.add_metric(['unit'], self._jetson.ram['unit'])112            yield g113            # 114            # Disk usage115            #116            g = GaugeMetricFamily('jetson_usage_disk', 'Disk space usage', labels=['disk'])117            g.add_metric(['used'], self._jetson.disk['used'])118            g.add_metric(['total'], self._jetson.disk['total'])119            g.add_metric(['available'], self._jetson.disk['available'])120            g.add_metric(['available_no_root'], self._jetson.disk['available_no_root'])121            yield g122            # 123            # Fan usage124            #125            g = GaugeMetricFamily('jetson_usage_fan', 'Fan usage', labels=['fan'])126            g.add_metric(['speed'], self._jetson.fan['speed'])127            # g.add_metric(['measure'], self._jetson.fan['measure'])128            # g.add_metric(['auto'], self._jetson.fan['auto'])129            # g.add_metric(['rpm'], self._jetson.fan['rpm'])130            # g.add_metric(['mode'], self._jetson.fan['mode'])131            yield g132            # 133            # Swapfile usage134            #135            g = GaugeMetricFamily('jetson_usage_swap', 'Swapfile usage', labels=['swap'])136            g.add_metric(['used'], self._jetson.swap['use'])137            g.add_metric(['total'], self._jetson.swap['tot'])138            # g.add_metric(['unit'], self._jetson.swap['unit'])139            # g.add_metric(['cached_size'], self._jetson.swap['cached']['size'])140            # g.add_metric(['cached_unit'], self._jetson.swap['cached']['unit'])141            yield g142            # 143            # Sensor temperatures144            #145            g = GaugeMetricFamily('jetson_temperatures', 'Sensor temperatures', labels=['temperature'])146            g.add_metric(['ao'], self._jetson.temperature['AO'] if 'AO' in self._jetson.temperature else 0)147            g.add_metric(['gpu'], self._jetson.temperature['GPU'] if 'GPU' in self._jetson.temperature else 0)148            g.add_metric(['tdiode'], self._jetson.temperature['Tdiode'] if 'Tdiode' in self._jetson.temperature else 0)149            g.add_metric(['aux'], self._jetson.temperature['AUX'] if 'AUX' in self._jetson.temperature else 0)150            g.add_metric(['cpu'], self._jetson.temperature['CPU'] if 'CPU' in self._jetson.temperature else 0)151            g.add_metric(['thermal'], self._jetson.temperature['thermal'] if 'thermal' in self._jetson.temperature else 0)152            g.add_metric(['tboard'], self._jetson.temperature['Tboard'] if 'Tboard' in self._jetson.temperature else 0)153            yield g154            # 155            # Power156            #157            g = GaugeMetricFamily('jetson_usage_power', 'Power usage', labels=['power'])158            g.add_metric(['cpu'], self._jetson.power[1]['CPU']['cur'] if 'CPU' in self._jetson.power[1] else 0)159            g.add_metric(['cv'], self._jetson.power[1]['CV']['cur'] if 'CV' in self._jetson.power[1] else 0)160            g.add_metric(['gpu'], self._jetson.power[1]['GPU']['cur'] if 'GPU' in self._jetson.power[1] else 0)161            g.add_metric(['soc'], self._jetson.power[1]['SOC']['cur'] if 'SOC' in self._jetson.power[1] else 0)162            g.add_metric(['sys5v'], self._jetson.power[1]['SYS5V']['cur'] if 'SYS5V' in self._jetson.power[1] else 0)163            g.add_metric(['vddrq'], self._jetson.power[1]['VDDRQ']['cur'] if 'VDDRQ' in self._jetson.power[1] else 0)164            yield g165if __name__ == '__main__':166    parser = argparse.ArgumentParser()167    parser.add_argument('--port', type=int, default=8000, help='Metrics collector port number')168    args = parser.parse_args()169    start_http_server(args.port)170    REGISTRY.register(CustomCollector())171    while True:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
