Best Python code snippet using localstack_python
ros-tls.py
Source:ros-tls.py  
...31def run_command(client, command):32    _, stdout, stderr = client.exec_command(command)33    print(''.join(stdout.readlines()))34    print(''.join(stderr.readlines()))35def renew_certificate(host, lego_exe_path, email):36    """Use the lego client to request a new Let's Encrypt certificate."""37    result = subprocess.call(38        [lego_exe_path, '--domains', host, '--email', email, '--accept-tos', '--dns', 'route53', 'run'])39    if result == 0:40        print(Fore.GREEN + '--> Renewed certificate!' + Fore.RESET)41        return True42    else:43        print(Fore.RED + '--> Failed to renew certificate!' + Fore.RESET)44        return False45def connect_via_ssh(host, ssh_user, ssh_key_path):46    client = SSHClient()47    client.load_system_host_keys()48    client.connect(host, username=ssh_user, key_filename=ssh_key_path, allow_agent=False, look_for_keys=False, disabled_algorithms=dict(pubkeys=['rsa-sha2-512', 'rsa-sha2-256']))49    return client50def upload_certificate(host, sftp_client, certificate_path):51    print('--> Uploading certificate')52    with open(certificate_path, 'r') as cert:53        with sftp_client.open(host + '.crt', 'w') as remote_cert:54            remote_cert.write(cert.read())55    print('--> Uploaded certificate')56def upload_key(host, sftp_client, private_key_path):57    print('--> Uploading private key')58    with open(private_key_path, 'r') as private_key:59        with sftp_client.open(host + '.key', 'w') as remote_key:60            remote_key.write(private_key.read())61    print('--> Uploaded private key')62def get_current_certificate(client):63    _, stdout, stderr = client.exec_command('/ip service print detail where name=www-ssl')64    output = ''.join(stdout.readlines())65    certificates = MATCH_CERTIFICATE.findall(output)66    if len(certificates) == 0:67        print('--> No current certificate found')68        return None69    else:70        if certificates[0] != '*1' and certificates[0] != 'none':71            print('--> Configured to use certificate "%s"' % certificates[0])72            return certificates[0]73        else:74            return None75def delete_certificate(client, certificate):76    print('--> Deleting certificate %s' % certificate)77    run_command(client, '/certificate remove ' + certificate)78    crl_path = certificate.replace('crt', 'crl')79    print('--> Deleting certificate revocation list %s' % certificate)80    run_command(client, '/certificate remove ' + crl_path)81def import_certificate(host, client):82    certificate_path = host + '.crt'83    print('--> Importing certificate')84    run_command(client, '/certificate import passphrase="" file-name=' + certificate_path)85    print('--> Imported certificate')86def import_key(host, client):87    key_path = host + '.key'88    print('--> Importing private key')89    run_command(client, '/certificate import passphrase="" file-name=' + key_path)90    print('--> Imported private key')91def set_new_certificate(host, client):92    new_certificate_name = host + '.crt_0'93    print('--> Setting new certificate to %s' % new_certificate_name)94    run_command(client, '/ip service set www-ssl certificate=' + new_certificate_name)95    print('--> New certificate installed')96def replace_certificate(host, ssh_user, ssh_key_path):97    """Upload an X509 certificate to the RouterOS device."""98    certificate_path = os.path.join('.lego', 'certificates', host + '.crt')99    private_key_path = os.path.join('.lego', 'certificates', host + '.key')100    if os.path.exists(certificate_path) and os.path.exists(private_key_path):101        client = connect_via_ssh(host, ssh_user, ssh_key_path)102        sftp_client = client.open_sftp()103        upload_certificate(host, sftp_client, certificate_path)104        upload_key(host, sftp_client, private_key_path)105        current_certificate = get_current_certificate(client)106        if current_certificate:107            delete_certificate(client, current_certificate)108        import_certificate(host, client)109        import_key(host, client)110        set_new_certificate(host, client)111        client.close()112    else:113        print(Fore.RED + '--> Could not find certificate' + Fore.RESET)114def check_hosts():115    print('ros-tls ' + Style.BRIGHT + 'v' + __VERSION__ + Style.RESET_ALL)116    if sys.version_info[0] != 3:117        exit(Fore.RED + 'python 3 is required')118    config = read_config()119    lego_exe_path = shutil.which('lego')120    if lego_exe_path is None:121        exit(Fore.RED + 'lego command not found - is it installed?')122    for host in config['hosts']:123        print('-> Checking for valid certificate on %s' % host)124        try:125            requests.get('https://' + host)126            certificate = ssl.get_server_certificate((host, 443))127            parsed_certificate = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, certificate)128            not_after = datetime.datetime.strptime(parsed_certificate.get_notAfter().decode('utf-8'), "%Y%m%d%H%M%SZ")129            now = datetime.datetime.now()130            if not_after - now <= datetime.timedelta(days=15):131                print('-> Certificate needs renewing (expires soon)')132                renewed = renew_certificate(host, lego_exe_path, config['adminEmail'])133                if renewed:134                    replace_certificate(host, config['sshUser'], config['sshKeyPath'])135            else:136                print('-> Certificate appears to be OK - doing nothing')137        except requests.exceptions.SSLError:138            print('-> Certificate needs renewing (or other SSL error)')139            renewed = renew_certificate(host, lego_exe_path, config['adminEmail'])140            if renewed:141                replace_certificate(host, config['sshUser'], config['sshKeyPath'])142        except requests.exceptions.ConnectionError as e:143            print(e)144            exit((Fore.RED + 'Failed to connect to host %s!' + Fore.RESET) % host)...acm.py
Source:acm.py  
...31                    elif cert['Status'] != 'ISSUED':32                        continue33                    elif (datetime.timedelta(weeks=8)34                            > cert['NotAfter'] - datetime.datetime.now(cert['NotAfter'].tzinfo)):35                        renew_certificate(acm, cert)36                    domain_options = {}37                    for options in cert['DomainValidationOptions']:38                        domain_options[options['DomainName']] = options.get('ValidationDomain', options['DomainName'])39                    if domain_options == domains:40                        act.ok('found')41                        found_cert = True42                if not found_cert:43                    act.warning('nothing found')44            if not found_cert:45                request_acm_cert(acm, cert_domain_name, domains)46def renew_certificate(acm, cert):47    with ActionOnExit('Renew Certificate {}. Resend Validation...'48                      .format(cert['CertificateArn'])) as act_renew:49        for d in cert["DomainValidationOptions"]:50            try:51                acm.resend_validation_email(52                    CertificateArn=cert['CertificateArn'],53                    Domain=d["DomainName"],54                    ValidationDomain=d["ValidationDomain"]55                )56            except Exception:57                act_renew.error('found existing config')58def resend_validation_email(acm, cert):59    renewal_status = cert.get('RenewalSummary', {}).get('RenewalStatus')60    if renewal_status != 'PENDING_VALIDATION':...sdlogon.py
Source:sdlogon.py  
...38        - Retry when SDException occurs, raise any other errors39        - when the daemon is stopped, this retry is cancelled using SIGTERM40          (seems not working for now as it only stops on 'kill -9' TBC)41    """42    renew_certificate(43        openid,44        password,45        force_renew_certificate=force_renew_certificate,46    )47# 1800000 => 30mn, 86400000 => 24 hours48@retry(49    wait_exponential_multiplier=1800000,50    wait_exponential_max=86400000,51    retry_on_exception=lambda e: isinstance(e, sdexception.SDException),52)53def renew_certificate_with_retry(openid, password, force_renew_certificate=False):54    """55    Retry mecanism when ESGF IDP cannot be reached.56    Not used57    Notes58        - IDP is periodically contacted using the following schedule: 59          1h, 2h, 4h, 8h, 16h, 24h, 24h, 24h, 24h...60          (based on 2^x which gives 2, 4, 8, 16, 32, 64, 128..)61        - Retry when SDException occurs, raise any other errors62        - when the daemon is stopped, this retry is cancelled using SIGTERM63          (seems not working for now as it only stops on 'kill -9' TBC)64    """65    renew_certificate(66        openid,67        password,68        force_renew_certificate=force_renew_certificate,69    )70def renew_certificate(openid, password, force_renew_certificate=False, force_renew_ca_certificates=False):71    """Renew ESGF certificate using sdmyproxy module."""72    # extract info from openid73    success, hostname, port, username = sdopenid.extract_info_from_openid(openid)74    if success:75        try:76            sdmyproxy.run(77                hostname,78                port,79                username,80                force_renew_certificate,81                force_renew_ca_certificates,82                password,83            )84        except Exception as e:85            sdlog.error(86                "SYDLOGON-012",87                "Error occured while retrieving certificate from myproxy server (%s)" % str(e),88            )89            raise90    return success91# init.92if __name__ == '__main__':93    from synda.source.config.file.user.credentials.models import Config as Credentials94    parser = argparse.ArgumentParser()95    args = parser.parse_args()96    credentials_ = Credentials()97    renew_certificate(98        credentials_.openid,99        credentials_.password,100        force_renew_certificate=True,101    )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
