Best Python code snippet using lettuce_webdriver_python
check.py
Source:check.py  
...12    if ssh_userhost is not None:13        message = '{}<click>autoterm -e ssh {}</click>'.format(message, ssh_userhost)14    print('<txt><span foreground="#FF7777">{}</span></txt>'.format(message))15    exit(0)16def check_alert(check_func, ssh_userhost=None):17    """18    Check if alert is needed19    """20    (is_ok, errors) = check_func()21    if not is_ok:22        alert(errors, ssh_userhost)23def ping_check(host):24    """25    Check if host is up26    """27    ping_cmd = 'ping -c 1 {}'.format(host)28    (exitcode, output) = subprocess.getstatusoutput(ping_cmd)29    if exitcode != 0:30        return (False, 'Failed to ping {}'.format(host))31    return (True, None)32def check_server(ssh_userhost):33    """34    Check if server is up and running using OpenSSH client35    """36    host = ssh_userhost.split('@')[1]37    ssh_cmd = 'ssh {} python3 - < {}/print_stats.py'.format(ssh_userhost, os.path.dirname(os.path.realpath(__file__)))38    (exitcode, output) = subprocess.getstatusoutput(ssh_cmd)39    if exitcode != 0:40        return (False, '{}: Failed to fetch stats'.format(host))41    errors = []42    status = json.loads(output)43    if type(status) != dict:44        return (False, '{}: Failed to parse stats ({} returned)'.format(host, type(status)))45    for partition in status['df']:46        kbytes_avail = status['df'][partition][0]47        kbytes_total = status['df'][partition][1]48        if kbytes_avail < 1024*1024*1024*5 or kbytes_avail < kbytes_total * 0.1:49            errors.append('{} {} has just {:.2f} GiB bytes left'.format(host, partition, status['df'][partition][0] / 1024 / 1024 / 1024))50    if status['la'][2] > 0.5:51        errors.append('{} la is {:.2f}'.format(host, status['la'][2]))52    if status['avail_mem'] < 0.25:53        errors.append('{} free memory is {:.2f}%'.format(host, status['avail_mem'] * 100))54    if status['free_swap'] < 0.5:55        errors.append('{} free swap is {:.2f}%'.format(host, status['free_swap'] * 100))56    return (len(errors) == 0, ", ".join(errors))57def urlopen(url):58    req = urllib.request.Request(59        url,60        data=None,61        headers={62            'User-Agent': 'UptimeRobot/1.0'63        }64    )65    return urllib.request.urlopen(req)66def check_http_ok(url):67    """68    Check if HTTP request returns 200 OK69    """70    try:71        with urlopen(url) as response:72            if response.code != 200:73                return (False, '{}: HTTP request failed with code {}'.format(url, response.code))74            contents = response.read()75            if contents != b'OK':76                return (False, '{}: {}'.format(url, contents))77            return (True, None)78    except Exception as e:79        return (False, 'Failed to check {} ({})'.format(url, e))80def check_http_contains(url, text):81    """82    Check if HTTP request returns 200 OK83    """84    try:85        with urlopen(url) as response:86            if response.code != 200:87                return (False, '{}: HTTP request failed with code {}'.format(url, response.code))88            contents = response.read()89            if contents.decode('utf-8').find(text) == -1:90                return (False, '{}: "{}" not found'.format(url, text))91            return (True, None)92    except Exception as e:93        return (False, 'Failed to check {} ({})'.format(url, e))94def check_dynadot_expiring_domains(api_key, ditch_domains):95    """96    Check if Dynadot has expiring domains97    """98    url = 'https://api.dynadot.com/api3.xml?key={}&command=list_domain'.format(api_key)99    try:100        with urlopen(url) as response:101            if response.code != 200:102                return (False, 'Dynadot HTTP request failed with code {}'.format(response.code))103            # create element tree object104            tree = ET.parse(response)105            # get root element106            root = tree.getroot()107            errors = []108            domain_items = root.findall('ListDomainInfoContent/DomainInfoList/DomainInfo/Domain')109            for domain_item in domain_items:110                name = domain_item.find('Name').text111                if name in ditch_domains:112                    continue113                expiry_days = int((int(domain_item.find('Expiration').text) / 1000 - int(time.time())) / (60 * 60 * 24))114                if expiry_days < 0:115                    errors.append('Dynadot domain {} expired {} days ago'.format(name, -expiry_days))116                elif expiry_days < 183:117                    errors.append('Dynadot domain {} is expiring in {} days'.format(name, expiry_days))118            return (len(errors) == 0, ", ".join(errors))119    except Exception as e:120        return (False, 'Failed to check {} ({})'.format(url, e))121conf_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conf.json')122conf = json.load(open(conf_path))123check_alert(lambda: ping_check('1.1.1.1'))124if 'dynadot' in conf:125    dynadot_conf = conf['dynadot']126    if 'apiKey' in dynadot_conf:127        if 'ditchDomains' in dynadot_conf:128            ditch = dynadot_conf['ditchDomains']129        else:130            ditch = []131        check_alert(lambda: check_dynadot_expiring_domains(dynadot_conf['apiKey'], ditch))132for server in conf['sshServers']:133    check_alert(lambda: check_server(server), server)134for url, ssh_userhost in conf['httpExpectOk'].items():135    check_alert(lambda: check_http_ok(url), ssh_userhost)136for url, text in conf['httpFind'].items():137    check_alert(lambda: check_http_contains(url, text))...nodes_diagnostic.py
Source:nodes_diagnostic.py  
1class NodesDiagnostic:2    def __init__(self, logger):3        self.logger = logger4    def check_alert(self, actual_value, total_value, warn_value,5                    critical_value, message):6        percent_value = round((actual_value * 100) / total_value, 2)7        if(percent_value > warn_value and8           percent_value < critical_value):9            message += ' : {}% - Warning\r\n'.format(percent_value)10        elif(percent_value > critical_value):11            message += ' : {}% - Critical\r\n'.format(percent_value)12        else:13            message += ' : {}% - OK\r\n'.format(percent_value)14        return message15    def alert_file_description(self, node, conditions):16        fd_used = node['fd_used']17        fd_total = node['fd_total']18        fd_result = 'File Descriptors Alert'19        return self.check_alert(fd_used, fd_total,20                                conditions['file_descriptors_used_percent_warn'],21                                conditions['file_descriptors_used_percent_critical'],22                                fd_result)23    def alert_files_description_as_sockets(self, node, conditions):24        sd_used = node['sockets_used']25        sd_total = node['sockets_total']26        sd_result = 'File Descriptors as Sockets Alert'27        return self.check_alert(sd_used, sd_total,28                                conditions['file_descriptors_used_as_sockets_percent_warn'],29                                conditions['file_descriptors_used_as_sockets_percent_critical'],30                                sd_result)31    def alert_disk_free(self, node, conditions):32        disk_free = node['disk_free']33        disk_free_limit = node['disk_free_limit']34        message = 'Disk Free Limit is {} actual value is {}: '.format(disk_free_limit, disk_free)35        disk_free_limit_critical = disk_free_limit / 236        if(disk_free < disk_free_limit and37           disk_free > disk_free_limit_critical):38            message += ' Warning\r\n'39        elif(disk_free < disk_free_limit_critical):40            message += ' Critial\r\n'41        else:42            message += ' OK\r\n'43        return message44    def alert_mem_free(self, node, conditions):45        mem_used = node['mem_used']46        mem_limit = node['mem_limit']47        mem_result = 'Mem Free Alert'48        return self.check_alert(mem_used, mem_limit,49                                conditions['memory_used_percent_warn'],50                                conditions['memory_used_percent_critical'],51                                mem_result)52    def alert_erlang_process(self, node, conditions):53        proc_used = node['proc_used']54        proc_total = node['proc_total']55        proc_result = 'Erlang processes used Alert '56        return self.check_alert(proc_used, proc_total,57                                conditions['erlang_process_percent_warn'],58                                conditions['erlang_process_percent_critical'],...test.py
Source:test.py  
1import requests2import json3serverToken = 'AAAAO5T7NUo:APA91bFj3mgtfeVJUp8OM7AqVOXhvPMTjbWJpncUnD2f3jXEkxDXh3F5eBqRvxTMFyUopxQOa-kcALIiNNrjdXN4RqZzuxTF0qXhgw2OnJcTqVWPtT_qVcV_1Cbj02rWHff0w3Y90O-5'4deviceToken = 'dMYdDuEwMFtJ58w_L42fv-:APA91bESoIFbnmG-6bkLN6a0VFqHCMhQ8YPndtAz1snjTpoGnTKjFkrPBr9K6pxck3uJDkZzkHjVXgOTZFITZ9alixSPelkIoiCfTeQkvoa4uXDnl88ssWvbko92Q-MvIzGZVSffaA9z'5headers = {6        'Content-Type': 'application/json',7        'Authorization': 'key=' + serverToken,8      }9body = {10          'notification': {'title': 'Alert Hight Risk',11                            'body': 'A Person Entered the Store without Mask.',12                            },13          "to": "/topics/Alert",14          'priority': 'high',15        #   'data': dataPayLoad,16        }17def send_alert():18    response = requests.post("https://fcm.googleapis.com/fcm/send",headers = headers, data=json.dumps(body))19    print(response.status_code)20    print(response.json())21import cv222import numpy as np23from tensorflow.keras.models import load_model24from time import time25model=load_model("./model2-019.model")26results={0:'without mask',1:'mask'}27GR_dict={0:(0,0,255),1:(0,255,0)}28rect_size = 429cap = cv2.VideoCapture(1) 30haarcascade = cv2.CascadeClassifier('/Users/vigneshkkar/opt/anaconda3/lib/python3.8/site-packages/cv2/data/haarcascade_frontalface_alt.xml')31check_alert = False32alert_sent = False33time_diff = time()34while True:35    (rval, im) = cap.read()36    im=cv2.flip(im,1,1) 37    38    rerect_size = cv2.resize(im, (im.shape[1] // rect_size, im.shape[0] // rect_size))39    faces = haarcascade.detectMultiScale(rerect_size)40    for f in faces:41        (x, y, w, h) = [v * rect_size for v in f] 42        43        face_img = im[y:y+h, x:x+w]44        rerect_sized=cv2.resize(face_img,(150,150))45        normalized=rerect_sized/255.046        reshaped=np.reshape(normalized,(1,150,150,3))47        reshaped = np.vstack([reshaped])48        result=model.predict(reshaped)49        50        label=np.argmax(result,axis=1)[0]51      52        cv2.rectangle(im,(x,y),(x+w,y+h),GR_dict[label],2)53        cv2.rectangle(im,(x,y-40),(x+w,y),GR_dict[label],-1)54        cv2.putText(im, results[label], (x, y-10),cv2.FONT_HERSHEY_SIMPLEX,0.8,(255,255,255),2)55        56        if(label == 1):57            check_alert =  False58            alert_sent = False59        if(label == 0 and check_alert == False):60            check_alert = True61            time_diff = time()62        if(int(time() - time_diff) > 5 and check_alert and alert_sent == False):63            send_alert()64            check_alert = False65            alert_sent = True66            67    cv2.imshow('Face Mask Detection',   im)68    key = cv2.waitKey(10)69    70    if key == 27: 71        break72cap.release()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
