Best Python code snippet using localstack_python
main_loop.py
Source:main_loop.py  
...214            r.lpush('get-cloudwatch-metrics-reply', metric_json)215        except Exception, err:216            servo.log.error('failed to publish cw metrics data: %s' % str(err))217    @staticmethod218    def report_instance_status(msg):219        hc_mgr = ServoLoop.hc_mgr220        try:221            if not 'data' in msg or not msg['data']:222                raise Exception('No data field in the received redis message')223            msg = msg['data']224        except Exception, err:225            servo.log.error('failed to parse get-instance-status message')226        instances = {}227        for instance_id in hc_mgr.list_instances():228            status = hc_mgr.health_status(instance_id)229            if status is None:230                continue231            instances[instance_id] = status232        json_format = json.dumps(instances)...nexpose.py
Source:nexpose.py  
1import rapid7vmconsole2import csv3from StringIO import StringIO4import base645from time import sleep678def generate_report(config):9    r7config = rapid7vmconsole.Configuration(name='Rapid7')10    #There is no OAuth. We know this is dumb. For demo purposes1112    r7config.username = config['username']13    r7config.password = config['password']14    r7config.host = config['host']15    r7config.verify_ssl = False16    r7config.assert_hostname = False17    r7config.proxy = None18    r7config.ssl_ca_cert = None19    r7config.connection_pool_maxsize = None20    r7config.cert_file = None21    r7config.key_file = None22    r7config.safe_chars_for_path_param = ''2324    auth = "%s:%s" % (r7config.username, r7config.password)25    auth = base64.b64encode(auth.encode('ascii')).decode()26    client = rapid7vmconsole.ApiClient(configuration=r7config)27    client.default_headers['Authorization'] = "Basic %s" % auth28    report_client = rapid7vmconsole.ReportApi(client)2930    try:31        report_id = create_report_sql(report_client, 'vulnReport1', '''32        select da.asset_id, da.mac_address, da.ip_address, das.port, dv.vulnerability_id, 33            dv.title, dv.description, dv.severity, dv.cvss_score, dv.exploits, dv.nexpose_id34        from fact_asset_vulnerability_finding as fpr 35        join dim_vulnerability as dv on fpr.vulnerability_id = dv.vulnerability_id 36        join dim_asset as da on fpr.asset_id = da.asset_id 37        join dim_asset_service as das on fpr.asset_id = das.asset_id''')3839        #print(report_id)4041        report_instance_id = run_report(report_client, report_id)42        #print(report_instance_id)4344        report = download_report(report_client, report_id, report_instance_id)45        with open('nexpose_vul_report1.txt', 'w') as f:46            f.write(report)4748        reader = csv.DictReader(StringIO(report))49        reader.next()50        critical_vulns = []5152        for row in reader:53            try:54                if float(row['cvss_score']) > 9 and int(row['exploits']) >= 1:55                    critical_vulns.append(row)56            except Exception as e:57                print(e)5859        delete_report(report_client, report_id)6061    except Exception as e:62        print(e)63        try:64            delete_report(report_client, report_id)65        except:66            pass67        return None6869    return critical_vulns707172def create_report_sql(client, report_name, sql):73    report_config = rapid7vmconsole.Report(name=report_name, format='sql-query', query=sql, version='2.3.0')74    response = client.create_report(report=report_config)75    return response.id767778def run_report(client, report_id):79    report = client.generate_report(report_id)80    return report.id818283def download_report(client, report_id, instance_id):84    report_done = False8586    while not report_done:87        report_instance_status = client.get_report_instance(report_id, instance_id).status8889        if any(report_instance_status in s for s in ['aborted', 'failed', 'complete']):90            report_done = True9192            report_contents = client.download_report(report_id, instance_id)9394            return report_contents95        else:96            sleep(30)979899def delete_report(client, report_id):100    client.delete_report(report_id)101102103104if __name__ == '__main__':
...generate_sql_report.py
Source:generate_sql_report.py  
1import rapid7vmconsole2import base643from time import sleep4def generate_report():5    config = rapid7vmconsole.Configuration(name='Rapid7')6    config.username = 'nxadmin'7    config.password = 'nxpassword'8    config.host = 'https://localhost:3780'9    config.verify_ssl = False10    config.assert_hostname = False11    config.proxy = None12    config.ssl_ca_cert = None13    config.connection_pool_maxsize = None14    config.cert_file = None15    config.key_file = None16    config.safe_chars_for_path_param = ''17    auth = "%s:%s" % (config.username, config.password)18    auth = base64.b64encode(auth.encode('ascii')).decode()19    client = rapid7vmconsole.ApiClient(configuration=config)20    client.default_headers['Authorization'] = "Basic %s" % auth21    report_client = rapid7vmconsole.ReportApi(client)22    report_id = create_report_sql(report_client, 'Assets', 'select * from dim_asset')23    print(report_id)24    report_instance_id = run_report(report_client, report_id)25    print(report_instance_id)26    report = download_report(report_client, report_id, report_instance_id)27    print(report)28    delete_report(report_client, report_id)29def create_report_sql(client, report_name, sql):30    report_config = rapid7vmconsole.Report(name=report_name, format='sql-query', query=sql, version='2.3.0')31    response = client.create_report(report=report_config)32    return response.id33def run_report(client, report_id):34    report = client.generate_report(report_id)35    return report.id36def download_report(client, report_id, instance_id):37    report_done = False38    while not report_done:39        report_instance_status = client.get_report_instance(report_id, instance_id).status40        if any(report_instance_status in s for s in ['aborted', 'failed', 'complete']):41            report_done = True42            report_contents = client.download_report(report_id, instance_id)43            return report_contents44        else:45            sleep(30)46def delete_report(client, report_id):47    client.delete_report(report_id)48if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
