Best Python code snippet using autotest_python
message.py
Source:message.py  
1import re2from time import sleep3from selenium import webdriver4from selenium.common.exceptions import NoSuchElementException, TimeoutException5from selenium.webdriver.common.by import By6from selenium.webdriver.support import expected_conditions as EC7from selenium.webdriver.support.wait import WebDriverWait8from webdriver_manager.chrome import ChromeDriverManager9# Correctly fill in the variables below, and then run this script with python3 message.py10username = 'your_username'11password = 'your_password'12# The URL of the group you want to post to. You MUST be a member of this group. Make sure there's no slash at the end.13group_link = 'https://www.linkedin.com/groups/10356456'14# The regex you want to look for in the person's title.15# For instance, if I want to look for CEOs, I would add '.*ceo.*'. Case insensitive.16member_title_regex_list = ['.*founder .*', '.*partner .*']17# A list of locations you want to contact people in.18# If I wanted to contact people in India and Canada, I would set this to ['India', 'Canada']19locations = ['India']20# The types of organisations the people you want to contact work at. There is a fixed list of types set by LinkedIn.21org_types = ['Legal Services', 'Law Practice']22# Set the following to True if you want to message people whose org types are unclear. False otherwise.23message_if_org_type_unclear = True24# The message you want to send people matching these descriptions.25# Potential inputs are {member_full_name}, {member_first_name}, {member_position}, {current_org}26message = "Hi {member_first_name}, \n\n" \27          "My name is Utkarsh and I'm co-founder of a LegalTech startup called Firmation (www.firmation.in). " \28          "We're both members of the Legal Transformation with Technology group, " \29          "and I saw that you're a decision maker at {current_org}, " \30          "so I wanted to reach out. \n\n" \31          "Many of the lawyers we've spoken to have complained that associates do not log their time regularly, " \32          "waste billable hours doing this at a later date, and sometimes even forget to log time for short " \33          "phone calls and email opinions, which means money lost.\n\n" \34          "To help address this issue, we're currently working on software to help lawyers automatically keep track " \35          "of time they spend working. \n\n" \36          "Does this sound like it could be useful to you? If so, I would love to set up a quick 20-minute chat " \37          "to discuss how our software could help you. \n\n" \38          "Thank you, and looking forward to hearing from you!\n\n" \39          "Utkarsh"40def check_if_member_messaged(linkedin_profile_url):41    # Open the file in read only mode42    with open('messaged_members.txt', 'r') as read_obj:43        # Read all lines in the file one by one44        for line in read_obj:45            # For each line, check if line contains the string46            if linkedin_profile_url in line:47                return True48    return False49def write_member_url_to_file(linkedin_profile_url):50    # Open the file in read only mode51    with open('messaged_members.txt', 'a') as file:52        # Read all lines in the file one by one53        file.write(linkedin_profile_url + '\n')54options = webdriver.ChromeOptions()55options.add_argument('--ignore-certificate-errors')56options.add_argument('--incognito')57# options.add_argument('--headless')58driver = webdriver.Chrome(ChromeDriverManager().install(), chrome_options=options)59driver.get('https://www.linkedin.com/login?fromSignIn=true&trk=guest_homepage-basic_nav-header-signin')60username_field = driver.find_element_by_id('username')61username_field.send_keys(username)62password_field = driver.find_element_by_id('password')63password_field.send_keys(password)64driver.find_element_by_xpath('//button[text()="Sign in"]').click()65driver.get(f'{group_link}/members')66members = driver.find_elements_by_class_name('groups-members-list__typeahead-result')67regex_list_str = '(?:% s)' % '|'.join(map(str.lower, member_title_regex_list))68ctr = 069while True:70    members = driver.find_elements_by_class_name('groups-members-list__typeahead-result')71    while ctr < len(members):72        member = members[ctr]73        try:74            member_title = member.find_element_by_class_name('artdeco-entity-lockup__subtitle').text75            if re.match(regex_list_str, member_title.lower()):76                member_full_name = member.find_element_by_class_name('artdeco-entity-lockup__title').text77                member_first_name = member_full_name.split(' ')[0]78                member_position = member_title.split(' at ')[0]79                message_member = False80                href = member.find_element_by_class_name("ui-entity-action-row__link").get_attribute("href")81                # handle current tab82                first_tab = driver.window_handles[0]83                # open new tab with specific url84                driver.execute_script("window.open('" + href + "');")85                # hadle new tab86                second_tab = driver.window_handles[1]87                # switch to second tab88                driver.switch_to.window(second_tab)89                member_url = driver.current_url90                if not check_if_member_messaged(member_url):91                    member_location = driver.find_element_by_class_name('pv-top-card--list-bullet').text92                    if any(location in member_location for location in locations):93                        try:94                            current_org_item = driver.find_element_by_class_name('pv-top-card--experience-list-item')95                            current_org = current_org_item.text96                            current_org_item.click()97                            wait = WebDriverWait(driver, 10).until(EC.presence_of_element_located98                                                                   ((By.CLASS_NAME, 'pv-entity__summary-info')))99                            experience_list = driver.find_element_by_class_name('pv-entity__summary-info').click()100                            wait = WebDriverWait(driver, 5).until(EC.presence_of_element_located101                                                                  ((By.CLASS_NAME,102                                                                    'org-top-card-summary-info-list__info-item')))103                            organisation_type = \104                            driver.find_elements_by_class_name('org-top-card-summary-info-list__info-item')[0].text105                            if organisation_type in org_types:106                                message_member = True107                        except TimeoutException as e:108                            print('No experience or company does not exist')109                            if message_if_org_type_unclear:110                                message_member = True111                else:112                    print("Already messaged this member, skipping.")113                driver.close()114                # switch to first tab115                driver.switch_to.window(first_tab)116                if message_member:117                    member.find_element_by_class_name('message-anywhere-button').click()118                    message_text_field = driver.find_element_by_class_name('msg-form__contenteditable')119                    message_to_send = message.format(member_full_name=member_full_name,120                                                     member_first_name=member_first_name,121                                                     member_position=member_position,122                                                     current_org=current_org)123                    print(f'Messaging {member_full_name} who is {member_position} at {current_org}: {message_to_send}')124                    message_text_field.send_keys(message_to_send)125                    driver.find_element_by_class_name('msg-form__send-button').click()126                    driver.find_element_by_xpath(127                        "//button[@data-control-name='overlay.close_conversation_window']").click()128                    write_member_url_to_file(member_url)129                    try:130                        driver.find_element_by_class_name("mlA").click()131                    except NoSuchElementException as e:132                        print("No confirm discard button")133        except Exception as e:134            print(e)135        ctr += 1136    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")137    new_members_len = len(driver.find_elements_by_class_name('groups-members-list__typeahead-result'))138    load_iterations = 0139    while new_members_len == len(members) and load_iterations < 100:140        new_members_len = len(driver.find_elements_by_class_name('groups-members-list__typeahead-result'))141        load_iterations += 1142        sleep(0.1)143    if new_members_len == len(members):...ab.py
Source:ab.py  
1"""2Module holds all stuff regarding usage of Apache Benchmark3Copyright 2016 BlazeMeter Inc.4Licensed under the Apache License, Version 2.0 (the "License");5you may not use this file except in compliance with the License.6You may obtain a copy of the License at7   http://www.apache.org/licenses/LICENSE-2.08Unless required by applicable law or agreed to in writing, software9distributed under the License is distributed on an "AS IS" BASIS,10WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11See the License for the specific language governing permissions and12limitations under the License.13"""14import logging15import re16from math import ceil17from distutils.version import LooseVersion18from bzt import TaurusConfigError19from bzt.engine import ScenarioExecutor, HavingInstallableTools, SelfDiagnosable20from bzt.modules.aggregator import ConsolidatingAggregator, ResultsReader21from bzt.modules.console import WidgetProvider, ExecutorWidget22from bzt.requests_model import HTTPRequest23from bzt.utils import iteritems, CALL_PROBLEMS, shutdown_process, RequiredTool, dehumanize_time, FileReader24class ApacheBenchmarkExecutor(ScenarioExecutor, WidgetProvider, HavingInstallableTools, SelfDiagnosable):25    """26    Apache Benchmark executor module27    """28    def __init__(self):29        super(ApacheBenchmarkExecutor, self).__init__()30        self.log = logging.getLogger('')31        self.process = None32        self._tsv_file = None33        self.tool = None34        self.scenario = None35    def prepare(self):36        super(ApacheBenchmarkExecutor, self).prepare()37        self.scenario = self.get_scenario()38        self.install_required_tools()39        self._tsv_file = self.engine.create_artifact("ab", ".tsv")40        self.stdout = open(self.engine.create_artifact("ab", ".out"), 'w')41        self.stderr = open(self.engine.create_artifact("ab", ".err"), 'w')42        self.reader = TSVDataReader(self._tsv_file, self.log)43        if isinstance(self.engine.aggregator, ConsolidatingAggregator):44            self.engine.aggregator.add_underling(self.reader)45    def get_widget(self):46        """47        Add progress widget to console screen sidebar48        :return:49        """50        if not self.widget:51            label = "%s" % self52            self.widget = ExecutorWidget(self, "ab: " + label.split('/')[1])53        return self.widget54    def __first_http_request(self):55        for request in self.scenario.get_requests():56            if isinstance(request, HTTPRequest):57                return request58        return None59    def startup(self):60        args = [self.tool.tool_path]61        load = self.get_load()62        load_iterations = load.iterations or 163        load_concurrency = load.concurrency or 164        if load.hold:65            hold = int(ceil(dehumanize_time(load.hold)))66            args += ['-t', str(hold)]67        else:68            args += ['-n', str(load_iterations * load_concurrency)]  # ab waits for total number of iterations69        timeout = self.get_scenario().get("timeout", None)70        if timeout:71            args += ['-s', str(ceil(dehumanize_time(timeout)))]72        args += ['-c', str(load_concurrency)]73        args += ['-d']  # do not print 'Processed *00 requests' every 100 requests or so74        args += ['-r']  # do not crash on socket level errors75        if self.tool.version and LooseVersion(self.tool.version) >= LooseVersion("2.4.7"):76            args += ['-l']  # accept variable-len responses77        args += ['-g', str(self._tsv_file)]  # dump stats to TSV file78        # add global scenario headers79        for key, val in iteritems(self.scenario.get_headers()):80            args += ['-H', "%s: %s" % (key, val)]81        requests = self.scenario.get_requests()82        if not requests:83            raise TaurusConfigError("You must specify at least one request for ab")84        if len(requests) > 1:85            self.log.warning("ab doesn't support multiple requests. Only first one will be used.")86        request = self.__first_http_request()87        if request is None:88            raise TaurusConfigError("ab supports only HTTP requests, while scenario doesn't have any")89        # add request-specific headers90        for key, val in iteritems(request.headers):91            args += ['-H', "%s: %s" % (key, val)]92        if request.method != 'GET':93            raise TaurusConfigError("ab supports only GET requests, but '%s' is found" % request.method)94        if request.priority_option('keepalive', default=True):95            args += ['-k']96        args += [request.url]97        self.reader.setup(load_concurrency, request.label)98        self.process = self._execute(args)99    def check(self):100        ret_code = self.process.poll()101        if ret_code is None:102            return False103        if ret_code != 0:104            self.log.warning("ab tool exited with non-zero code: %s", ret_code)105        return True106    def shutdown(self):107        shutdown_process(self.process, self.log)108    def install_required_tools(self):109        self.tool = self._get_tool(ApacheBenchmark, config=self.settings)110        if not self.tool.check_if_installed():111            self.tool.install()112    def get_error_diagnostics(self):113        diagnostics = []114        if self.stdout is not None:115            with open(self.stdout.name) as fds:116                contents = fds.read().strip()117                if contents.strip():118                    diagnostics.append("ab STDOUT:\n" + contents)119        if self.stderr is not None:120            with open(self.stderr.name) as fds:121                contents = fds.read().strip()122                if contents.strip():123                    diagnostics.append("ab STDERR:\n" + contents)124        return diagnostics125class TSVDataReader(ResultsReader):126    def __init__(self, filename, parent_logger):127        super(TSVDataReader, self).__init__()128        self.log = parent_logger.getChild(self.__class__.__name__)129        self.file = FileReader(filename=filename, parent_logger=self.log)130        self.skipped_header = False131        self.concurrency = None132        self.url_label = None133    def setup(self, concurrency, url_label):134        self.concurrency = concurrency135        self.url_label = url_label136        return True137    def _read(self, last_pass=False):138        lines = self.file.get_lines(size=1024 * 1024, last_pass=last_pass)139        for line in lines:140            if not self.skipped_header:141                self.skipped_header = True142                continue143            log_vals = [val.strip() for val in line.split('\t')]144            _error = None145            _rstatus = None146            _url = self.url_label147            _concur = self.concurrency148            _tstamp = int(log_vals[1])  # timestamp - moment of request sending149            _con_time = float(log_vals[2]) / 1000.0  # connection time150            _etime = float(log_vals[4]) / 1000.0  # elapsed time151            _latency = float(log_vals[5]) / 1000.0  # latency (aka waittime)152            _bytes = None153            yield _tstamp, _url, _concur, _etime, _con_time, _latency, _rstatus, _error, '', _bytes154class ApacheBenchmark(RequiredTool):155    def __init__(self, config=None, **kwargs):156        settings = config or {}157        tool_path = settings.get('path', 'ab')158        super(ApacheBenchmark, self).__init__(tool_path=tool_path, installable=False, **kwargs)159    def _get_version(self, output):160        version = re.findall("Version\s(\S+)\s", output)161        if not version:162            self.log.warning("%s tool version parsing error: %s", self.tool_name, output)163        else:164            return version[0]165    def check_if_installed(self):166        self.log.debug('Trying %s: %s', self.tool_name, self.tool_path)167        try:168            out, err = self.call([self.tool_path, '-V'])169        except CALL_PROBLEMS as exc:170            self.log.warning("%s check failed: %s", self.tool_name, exc)171            return False172        self.version = self._get_version(out)173        self.log.debug("%s check stdout: %s", self.tool_name, out)174        if err:175            self.log.warning("%s check stderr: %s", self.tool_name, err)...incremental_reobserve.py
Source:incremental_reobserve.py  
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3import itertools4import logging5import os6import random7import sys8import time9from pathlib import Path10from typing import List11import dill12import matplotlib.pyplot as plt13import mayavi.mlab as mlab14import numpy as np15import seaborn as sb16import sympy as sym17from matplotlib import rc, rcParams18from mpl_toolkits.mplot3d import Axes3D19from noise import pnoise1, pnoise220from opensimplex import OpenSimplex21from scipy import stats22from scipy.linalg import block_diag23from scipy.special import gamma as gammafn24from scipy.stats import gamma, multivariate_normal25from tqdm import tqdm26from transformations import *27from stmmap import MessageCounter, RelativeSubmap28from stmmap.utils.plot_utils import PlotConfig29from utils.generate_environments import gen_env_fn_3d, gen_meas_3d30# -------- Plot configurations ---------------------------------------------------------31plt_cfg = PlotConfig()32# -------- Logging configuration -------------------------------------------------------33loggers: List[logging.Logger] = []34loggers += [logging.getLogger(__name__)]35loggers += [logging.getLogger("stmmap")]36level = logging.DEBUG  # This could be controlled with --log=DEBUG (I think)37# create formatter and add it to the handlers38FORMAT = "%(asctime)s - %(name)s::%(funcName)s - %(levelname)s - %(message)s"39formatter = logging.Formatter(FORMAT)40ch = logging.StreamHandler()41ch.setLevel(level)42ch.setFormatter(formatter)43for logger in loggers:44    logger.setLevel(level)45    logger.addHandler(ch)46root_logger = loggers[0]47root_logger.debug("Logging initialised")48# -------- Constants and Flags ----------------------------------------------------------49seed = (int)(np.random.rand() * (2.0 ** 30))50# seed =51# seed = 6649082452# seed = 30707211753# seed = 33522088254# seed = 4895687655seed = 86141247756np.random.seed(seed)57random.seed(seed)58print("Seed", seed)59# Measurement constants60# N_z_gen = 5000061# N_updates = 3062# grid_depth = 563N_z_gen = 10 * 4 ** 564N_updates = 5065grid_depth = 566env_type = "perlin"67func = gen_env_fn_3d(env_type, seed=seed)68directory = Path(__file__).parents[0] / "results"69if not os.path.exists(directory):70    os.makedirs(directory)71directory /= "incremental"72directory /= "reobserve"73if not os.path.exists(directory):74    os.makedirs(directory)75load_iterations = True76if load_iterations == False:77    root_logger.debug("Generating new measurements and updating")78    grid = RelativeSubmap(max_depth=grid_depth, tolerance=1e-5)79    normalised_iterations = []80    iterations = []81    # NOTE This is a pseudo update. There is something weird with just the first update. It add more messages than needed I think82    C = 10000 * np.eye(3).reshape(1, 3, 3)83    grid.insert_measurements(np.zeros((1, 3, 1)), C)84    update_iter = grid.update()85    for i in range(N_updates):86        print("Update number:", i)87        # Generate Measurements88        print(i, "------- Generating Measurements -------")89        m_z_stack, C_z_stack = gen_meas_3d(func, N_z_gen, scale=1)90        b = (91            (m_z_stack[:, 0, 0] + m_z_stack[:, 1, 0] <= 1)92            & (m_z_stack[:, 0, 0] > 0)93            & (m_z_stack[:, 0, 0] < 1)94            & (m_z_stack[:, 1, 0] > 0)95            & (m_z_stack[:, 1, 0] < 1)96        )97        m_z_stack = m_z_stack[b, :, :]98        C_z_stack = C_z_stack[b, :, :]99        N_z = m_z_stack.shape[0]100        print(i, "N: total =", N_z_gen, " remainder = ", N_z)101        # Calculate approximation102        print(i, "------- Fusing Measurements -------")103        grid.insert_measurements(m_z_stack, C_z_stack)104        t0 = time.time()105        update_iter: MessageCounter = grid.update()106        normalised_iterations.append(len(update_iter) / N_z)107        iterations.append(len(update_iter))108        t1 = time.time()109        tot = t1 - t0110        print(i, "Approximation runtime:", tot, "s")111    np.savez(open(directory / "iterations.npz", "wb"), iterations, normalised_iterations)112else:113    root_logger.debug("Loading previously calculated results")114    # Load iterations if you want to reuse115    iterations = np.load(open(directory / "iterations.npz", "rb"))["arr_0"]116    normalised_iterations = np.load(open(directory / "iterations.npz", "rb"))["arr_1"]117ratio = 7 / 16118fig_scale = 0.98119fs = (fig_scale * plt_cfg.tex_textwidth, fig_scale * plt_cfg.tex_textwidth * ratio)120fig = plt.figure(figsize=fs, constrained_layout=True)121ax = plt.gca()122# ax_right = ax.twinx()123p1, = ax.plot(iterations, marker=".", color="C0", markersize=4)124# p1, = ax.plot(np.array(iterations) / 4**grid_depth, marker=".", color="C0")125# p2, = ax_right.plot(normalised_iterations, marker=".", color="C1")126ax.set_xlabel("Time step")127ax.set_ylabel("Number of messages passed")128# ax.set_ylabel("Average number of messages passed per surfel")129# ax_right.set_ylabel("Normalised number of messages passed")130ax.ticklabel_format(style="sci", scilimits=(0, 0), axis="y")131# tkw = dict(size=4, width=1.5)132# ax.tick_params(axis="y", colors=p1.get_color(), **tkw)133# ax_right.tick_params(axis="y", colors=p2.get_color(), **tkw)134# base = 10135# ax.set_ylim(0, base * np.ceil(np.max(iterations) * 1.05 / base))136# ax_right.set_ylim(0, base * np.ceil(np.max(normalised_iterations) * 1.05 / base))137# N_surfels = 4**grid_depth138# ln = ax.axhline(N_surfels, color="r", linestyle="-.", label="Number of surfels")139# bgcol = ax.get_facecolor()140# ax.text(141#     0.2,142#     N_surfels,143#     "Number of surfels",144#     ha="center",145#     va="center",146#     rotation="horizontal",147#     backgroundcolor=bgcol,148#     transform=ln.get_transform(),149# )150# ax.legend()151# plt.xticks(fontsize=tex_fontsize - 1)152# plt.yticks(fontsize=tex_fontsize - 1)153filename = directory / f"{env_type}_{seed}.pdf"154plt.savefig(filename)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
