How to use draw_bytes method in hypothesis

Best Python code snippet using hypothesis

test_conjecture_engine.py

Source:test_conjecture_engine.py Github

copy

Full Screen

...36 return hbytes(runner.last_data.buffer)37def test_can_index_results():38 @run_to_buffer39 def f(data):40 data.draw_bytes(5)41 data.mark_interesting()42 assert f.index(0) == 043 assert f.count(0) == 544def test_non_cloneable_intervals():45 @run_to_buffer46 def x(data):47 data.draw_bytes(10)48 data.draw_bytes(9)49 data.mark_interesting()50 assert x == hbytes(19)51def test_duplicate_buffers():52 @run_to_buffer53 def x(data):54 t = data.draw_bytes(10)55 if not any(t):56 data.mark_invalid()57 s = data.draw_bytes(10)58 if s == t:59 data.mark_interesting()60 assert x == bytes_from_list([0] * 9 + [1]) * 261def test_clone_into_variable_draws():62 @run_to_buffer63 def x(data):64 small = 065 large = 066 for _ in range(30):67 data.start_example()68 b = data.draw_bytes(1)[0] & 169 if b:70 data.draw_bytes(3)71 large += 172 else:73 data.draw_bytes(2)74 small += 175 data.stop_example()76 if small < 10:77 data.mark_invalid()78 if large >= 10:79 data.mark_interesting()80 assert set(x) == set((0, 1))81 assert x.count(1) == 1082 assert len(x) == 30 + (20 * 2) + (10 * 3)83def test_deletable_draws():84 @run_to_buffer85 def x(data):86 while True:87 x = data.draw_bytes(2)88 if x[0] == 255:89 data.mark_interesting()90 assert x == hbytes([255, 0])91def zero_dist(random, n):92 return hbytes(n)93def test_distribution_may_be_ignored():94 @run_to_buffer95 def x(data):96 t = data.draw_bytes(5, zero_dist)97 if all(t) and 255 in t:98 data.mark_interesting()99 assert x == hbytes([1] * 4 + [255])100def test_can_load_data_from_a_corpus():101 key = b'hi there'102 db = ExampleDatabase()103 value = b'=\xc3\xe4l\x81\xe1\xc2H\xc9\xfb\x1a\xb6bM\xa8\x7f'104 db.save(key, value)105 def f(data):106 if data.draw_bytes(len(value)) == value:107 data.mark_interesting()108 runner = TestRunner(109 f, settings=settings(database=db), database_key=key)110 runner.run()111 assert runner.last_data.status == Status.INTERESTING112 assert runner.last_data.buffer == value113 assert len(list(db.fetch(key))) == 1114def test_terminates_shrinks():115 shrinks = [-1]116 def tf(data):117 x = hbytes(data.draw_bytes(100))118 if sum(x) >= 500:119 shrinks[0] += 1120 data.mark_interesting()121 runner = TestRunner(tf, settings=settings(122 max_examples=5000, max_iterations=10000, max_shrinks=10,123 database=None,124 ))125 runner.run()126 assert runner.last_data.status == Status.INTERESTING127 # There's an extra non-shrinking check step to abort in the presence of128 # flakiness129 assert shrinks[0] == 11130def test_detects_flakiness():131 failed_once = [False]132 count = [0]133 def tf(data):134 data.draw_bytes(1)135 count[0] += 1136 if not failed_once[0]:137 failed_once[0] = True138 data.mark_interesting()139 runner = TestRunner(tf)140 runner.run()141 assert count == [2]142def test_variadic_draw():143 def draw_list(data):144 result = []145 while True:146 data.start_example()147 d = data.draw_bytes(1)[0] & 7148 if d:149 result.append(data.draw_bytes(d))150 data.stop_example()151 if not d:152 break153 return result154 @run_to_buffer155 def b(data):156 if any(all(d) for d in draw_list(data)):157 data.mark_interesting()158 l = draw_list(TestData.for_buffer(b))159 assert len(l) == 1160 assert len(l[0]) == 1161def test_draw_to_overrun():162 @run_to_buffer163 def x(data):164 d = (data.draw_bytes(1)[0] - 8) & 0xff165 data.draw_bytes(128 * d)166 if d >= 2:167 data.mark_interesting()168 assert x == hbytes([10]) + hbytes(128 * 2)169def test_can_navigate_to_a_valid_example():170 def f(data):171 i = int_from_bytes(data.draw_bytes(2))172 data.draw_bytes(i)173 data.mark_interesting()174 runner = TestRunner(f, settings=settings(175 max_examples=5000, max_iterations=10000,176 buffer_size=2,177 database=None,178 ))179 runner.run()180 assert runner.last_data.status == Status.INTERESTING181 return hbytes(runner.last_data.buffer)182def test_stops_after_max_iterations_when_generating():183 key = b'key'184 value = b'rubber baby buggy bumpers'185 max_iterations = 100186 db = ExampleDatabase(':memory:')187 db.save(key, value)188 seen = []189 def f(data):190 seen.append(data.draw_bytes(len(value)))191 data.mark_invalid()192 runner = TestRunner(f, settings=settings(193 max_examples=1, max_iterations=max_iterations,194 database=db,195 ), database_key=key)196 runner.run()197 assert len(seen) == max_iterations198 assert value in seen199def test_stops_after_max_iterations_when_reading():200 key = b'key'201 max_iterations = 1202 db = ExampleDatabase(':memory:')203 for i in range(10):204 db.save(key, hbytes([i]))205 seen = []206 def f(data):207 seen.append(data.draw_bytes(1))208 data.mark_invalid()209 runner = TestRunner(f, settings=settings(210 max_examples=1, max_iterations=max_iterations,211 database=db,212 ), database_key=key)213 runner.run()214 assert len(seen) == max_iterations215def test_stops_after_max_examples_when_reading():216 key = b'key'217 db = ExampleDatabase(':memory:')218 for i in range(10):219 db.save(key, hbytes([i]))220 seen = []221 def f(data):222 seen.append(data.draw_bytes(1))223 runner = TestRunner(f, settings=settings(224 max_examples=1,225 database=db,226 ), database_key=key)227 runner.run()228 assert len(seen) == 1229def test_stops_after_max_examples_when_generating():230 seen = []231 def f(data):232 seen.append(data.draw_bytes(1))233 runner = TestRunner(f, settings=settings(234 max_examples=1,235 database=None,236 ))237 runner.run()238 assert len(seen) == 1239@given(st.random_module())240@settings(max_shrinks=0, timeout=3, min_satisfying_examples=1)241def test_interleaving_engines(rnd):242 @run_to_buffer243 def x(data):244 rnd = Random(hbytes(data.draw_bytes(8)))245 def g(d2):246 while True:247 b = d2.draw_bytes(1)[0]248 result = data.draw_bytes(b)249 if 255 in result:250 d2.mark_interesting()251 if 0 in result:252 d2.mark_invalid()253 runner = TestRunner(g, random=rnd)254 runner.run()255 if runner.last_data.status == Status.INTERESTING:256 data.mark_interesting()257 assert x[8:].count(255) == 1258def test_run_with_timeout_while_shrinking():259 def f(data):260 time.sleep(0.1)261 x = data.draw_bytes(32)262 if any(x):263 data.mark_interesting()264 runner = TestRunner(f, settings=settings(database=None, timeout=0.2,))265 start = time.time()266 runner.run()267 assert time.time() <= start + 1268 assert runner.last_data.status == Status.INTERESTING269def test_run_with_timeout_while_boring():270 def f(data):271 time.sleep(0.1)272 runner = TestRunner(f, settings=settings(database=None, timeout=0.2,))273 start = time.time()274 runner.run()275 assert time.time() <= start + 1276 assert runner.last_data.status == Status.VALID277def test_max_shrinks_can_disable_shrinking():278 seen = set()279 def f(data):280 seen.add(hbytes(data.draw_bytes(32)))281 data.mark_interesting()282 runner = TestRunner(f, settings=settings(database=None, max_shrinks=0,))283 runner.run()284 assert len(seen) == 1285def test_phases_can_disable_shrinking():286 seen = set()287 def f(data):288 seen.add(hbytes(data.draw_bytes(32)))289 data.mark_interesting()290 runner = TestRunner(f, settings=settings(291 database=None, phases=(Phase.reuse, Phase.generate),292 ))293 runner.run()294 assert len(seen) == 1295def test_saves_data_while_shrinking():296 key = b'hi there'297 n = 5298 db = ExampleDatabase(':memory:')299 assert list(db.fetch(key)) == []300 seen = set()301 def f(data):302 x = data.draw_bytes(512)303 if sum(x) >= 5000 and len(seen) < n:304 seen.add(hbytes(x))305 if hbytes(x) in seen:306 data.mark_interesting()307 runner = TestRunner(308 f, settings=settings(database=db), database_key=key)309 runner.run()310 assert runner.last_data.status == Status.INTERESTING311 assert len(seen) == n312 in_db = set(db.fetch(key))313 assert in_db.issubset(seen)314 assert in_db == seen315def test_can_discard():316 n = 32317 @run_to_buffer318 def x(data):319 seen = set()320 while len(seen) < n:321 seen.add(hbytes(data.draw_bytes(1)))322 data.mark_interesting()323 assert len(x) == n324def test_erratic_draws():325 n = [0]326 @run_to_buffer327 def x(data):328 data.draw_bytes(n[0])329 data.draw_bytes(255 - n[0])330 if n[0] == 255:331 data.mark_interesting()332 else:333 n[0] += 1334def test_no_read_no_shrink():335 count = [0]336 @run_to_buffer337 def x(data):338 count[0] += 1339 data.mark_interesting()340 assert x == b''341 assert count == [1]342def test_garbage_collects_the_database():343 key = b'hi there'344 n = 200345 db = ExampleDatabase(':memory:')346 assert list(db.fetch(key)) == []347 seen = set()348 go = True349 def f(data):350 x = hbytes(data.draw_bytes(512))351 if not go:352 return353 if sum(x) >= 5000 and len(seen) < n:354 seen.add(x)355 if x in seen:356 data.mark_interesting()357 runner = TestRunner(358 f, settings=settings(database=db, max_shrinks=2 * n), database_key=key)359 runner.run()360 assert runner.last_data.status == Status.INTERESTING361 assert len(seen) == n362 assert set(db.fetch(key)) == seen363 go = False364 runner = TestRunner(365 f, settings=settings(database=db, max_shrinks=2 * n), database_key=key)366 runner.run()367 assert 0 < len(set(db.fetch(key))) < n368def test_variable_replacement():369 @run_to_buffer370 def x(data):371 for _ in range(5):372 data.start_example()373 c = 0374 while True:375 d = data.draw_bytes(1)[0]376 if not d:377 break378 c += d379 data.stop_example()380 if c < 1000:381 data.mark_invalid()382 data.mark_interesting()383 assert x == x[:x.index(0) + 1] * 5384@given(st.randoms(), st.random_module())385def test_maliciously_bad_generator(rnd, seed):386 rnd = Random()387 @run_to_buffer388 def x(data):389 for _ in range(rnd.randint(0, 100)):390 data.draw_bytes(rnd.randint(0, 10))391 if rnd.randint(0, 1):392 data.mark_invalid()393 else:...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1"""2@Author : Ailitonia3@Date : 2021/08/15 1:194@FileName : __init__.py.py5@Project : nonebot2_miya 6@Description : 7@GitHub : https://github.com/Ailitonia8@Software : PyCharm 9"""10from datetime import datetime11from nonebot import on_command, logger12from nonebot.plugin.export import export13from nonebot.typing import T_State14from nonebot.rule import to_me15from nonebot.permission import SUPERUSER16from nonebot.adapters.cqhttp.bot import Bot17from nonebot.adapters.cqhttp.event import MessageEvent, GroupMessageEvent, PrivateMessageEvent18from nonebot.adapters.cqhttp.permission import GROUP_ADMIN, GROUP_OWNER, PRIVATE_FRIEND19from nonebot.adapters.cqhttp.message import MessageSegment20from omega_miya.utils.omega_plugin_utils import init_export, init_processor_state, PicEncoder21from omega_miya.database import DBStatistic22from .utils import draw_statistics23# Custom plugin usage text24__plugin_custom_name__ = '统计信息'25__plugin_usage__ = r'''【Omega 插件使用统计】26查询插件使用统计信息27**Permission**28Friend Private29Command & Lv.1030or AuthNode31**AuthNode**32basic33**Usage**34/统计信息 [条件]'''35# Init plugin export36init_export(export(), __plugin_custom_name__, __plugin_usage__)37# 注册事件响应器38statistic = on_command(39 '统计信息',40 # 使用run_preprocessor拦截权限管理, 在default_state初始化所需权限41 state=init_processor_state(42 name='statistic',43 command=True,44 level=10),45 aliases={'插件统计', 'statistic'},46 permission=SUPERUSER | GROUP_ADMIN | GROUP_OWNER | PRIVATE_FRIEND,47 priority=10,48 block=True)49# 修改默认参数处理50@statistic.args_parser51async def parse(bot: Bot, event: MessageEvent, state: T_State):52 args = str(event.get_plaintext()).strip().lower().split()53 if not args:54 await statistic.reject('你似乎没有发送有效的参数呢QAQ, 请重新发送:')55 state[state["_current_key"]] = args[0]56 if state[state["_current_key"]] == '取消':57 await statistic.finish('操作已取消')58@statistic.handle()59async def handle_first_receive(bot: Bot, event: MessageEvent, state: T_State):60 args = str(event.get_plaintext()).strip().lower().split()61 if not args:62 state['condition'] = '本月'63 elif args and len(args) == 1:64 state['condition'] = args[0]65 else:66 await statistic.finish('参数错误QAQ')67@statistic.got('condition', prompt='请输入查询条件:\n【全部/本月/本年】')68async def handle_statistic(bot: Bot, event: MessageEvent, state: T_State):69 condition = state['condition']70 self_id = event.self_id71 now = datetime.now()72 if condition == '本月':73 start_time = datetime(year=now.year, month=now.month, day=1)74 elif condition == '本年':75 start_time = datetime(year=now.year, month=1, day=1)76 else:77 condition = '全部'78 start_time = None79 if isinstance(event, GroupMessageEvent):80 title = f'本群{condition}插件使用统计'81 group_id = event.group_id82 statistic_result = await DBStatistic(83 self_bot_id=self_id).get_group_statistic(group_id=group_id, start_time=start_time)84 elif isinstance(event, PrivateMessageEvent):85 title = f'用户{condition}插件使用统计'86 user_id = event.user_id87 statistic_result = await DBStatistic(88 self_bot_id=self_id).get_user_statistic(user_id=user_id, start_time=start_time)89 else:90 return91 if statistic_result.error:92 logger.error(f'查询统计信息失败, error: {statistic_result.info}')93 await statistic.finish('查询统计信息失败QAQ')94 draw_bytes = await draw_statistics(data=statistic_result.result, title=title)95 img_result = await PicEncoder.bytes_to_file(image=draw_bytes, folder_flag='statistic')96 if img_result.error:97 logger.error(f'生成统计图表失败, error: {img_result.info}')98 await statistic.finish('生成统计图表失败QAQ')99 await statistic.finish(MessageSegment.image(img_result.result))100admin_statistic = on_command(101 '全局统计信息',102 # 使用run_preprocessor拦截权限管理, 在default_state初始化所需权限103 rule=to_me(),104 state=init_processor_state(105 name='admin_statistic',106 command=True,107 level=10),108 aliases={'全局插件统计', 'total_stat'},109 permission=SUPERUSER,110 priority=10,111 block=True)112@admin_statistic.handle()113async def handle_admin_statistic(bot: Bot, event: MessageEvent, state: T_State):114 self_id = event.self_id115 statistic_result = await DBStatistic(self_bot_id=self_id).get_bot_statistic()116 if statistic_result.error:117 logger.error(f'查询全局统计信息失败, error: {statistic_result.info}')118 await statistic.finish('查询全局统计信息失败QAQ')119 title = f'Bot:{self_id} 全局插件使用统计'120 draw_bytes = await draw_statistics(data=statistic_result.result, title=title)121 img_result = await PicEncoder.bytes_to_file(image=draw_bytes, folder_flag='statistic')122 if img_result.error:123 logger.error(f'生成全局统计图表失败, error: {img_result.info}')124 await statistic.finish('生成全局统计图表失败QAQ')...

Full Screen

Full Screen

report.py

Source:report.py Github

copy

Full Screen

...21 'value' : float(line[5])22 }23 def draw_request(self, report):24 report.draw('sdb_request', ('SelectGet', 'Requests'), ('PutAttributes','Requests'), ('GetAttributes', 'Requests'), ('ListDomains', 'Requests'))25 def draw_bytes(self, report):26 report.draw('sdb_bytes', ('GetAttributes','EC2DataTransfer-In-Bytes'), ('SelectGet','EC2DataTransfer-Out-Bytes'), ('PutAttributes','EC2DataTransfer-Out-Bytes'),('ListDomains','DataTransfer-Out-Bytes'))27 def draw_all(self, report):28 self.draw_request(report)29 self.draw_bytes(report)30class S3(object):31 def draw_request(self, report):32 report.draw('s3_request', ('GetObject', 'Requests-Tier?'), ('PutObject', 'Requests-Tier?'), ('ListBucket', 'Requests-Tier?'), ('HeadObject', 'Requests-Tier?'))33 def draw_bytes(self, report):34 report.draw('s3_bytes', ('GetObject','DataTransfer-Out-Bytes'), ('PutObject','C3DataTransfer-In-Bytes'), ('ListBucket','DataTransfer-Out-Bytes'))35 def draw_objects(self, report):36 report.draw('s3_objects', ('StandardStorage','StorageObjectCount'))37 def draw_all(self, report):38 self.draw_request(report)39 self.draw_bytes(report)40 self.draw_objects(report)41 def filter(self, line):42 return {43 'service' : line[0],44 'operation': line[1],45 'usageType': line[2],46 'resource' : line[3],47 'start' : datetime.strptime(line[4], '%m/%d/%y %H:%M:%S'),48 'end' : datetime.strptime(line[5], '%m/%d/%y %H:%M:%S'),49 'value' : float(line[6])50 }51class SQS(object):52 def filter(self, line):53 return {54 'service' : line[0],55 'operation': line[1],56 'usageType': line[2],57 'start' : datetime.strptime(line[3], '%m/%d/%y %H:%M:%S'),58 'end' : datetime.strptime(line[4], '%m/%d/%y %H:%M:%S'),59 'value' : float(line[5])60 }61 def draw_request(self, report):62 report.draw('sqs_request', ('Receive','Requests-RBP'), ('Send','Requests-RBP'))63 def draw_bytes(self, report):64 report.draw('sqs_bytes', ('Receive','EC2DataTransfer-Out-Bytes'), ('Send','EC2DataTransfer-In-Bytes'))65 def draw_all(self, report):66 self.draw_request(report)67 self.draw_bytes(report)68class CloudFront(object):69 def filter(self, line):#Service, Operation, UsageType, Resource, StartTime, EndTime, UsageValue70 return {71 'service' : line[0],72 'operation': line[1],73 'usageType': line[2],74 'resource' : line[3],75 'start' : datetime.strptime(line[4], '%m/%d/%y %H:%M:%S'),76 'end' : datetime.strptime(line[5], '%m/%d/%y %H:%M:%S'),77 'value' : float(line[6])78 }79 def draw_request(self, report):80 report.draw('cf_request', ('GET','EU-Requests-Tier1'), ('GET','JP-Requests-Tier1'), ('GET','US-Requests-Tier1'), ('GET','AP-Requests-Tier1') )81 def draw_bytes(self, report):82 report.draw('cf_bytes', ('GET','EU-DataTransfer-Out-Bytes'), ('GET','JP-DataTransfer-Out-Bytes'), ('GET','US-DataTransfer-Out-Bytes'), ('GET','AP-DataTransfer-Out-Bytes'))83 def draw_all(self, report):84 self.draw_request(report)85 #self.draw_bytes(report)86class SES(object):87 def filter(self,line):#Service, Operation, UsageType, StartTime, EndTime, UsageValue88 return {89 'service' : line[0],90 'operation': line[1],91 'usageType': line[2],92 'start' : datetime.strptime(line[3], '%m/%d/%y %H:%M:%S'),93 'end' : datetime.strptime(line[4], '%m/%d/%y %H:%M:%S'),94 'value' : float(line[5])95 }96 def draw_request(self, report):97 report.draw('ses_request', ('SendEmail','Requests'), ('SendEmail','Recipients'), ('SendEmail','Requests-EC2'))98 def draw_bytes(self, report):99 report.draw('ses_bytes', ('SendEmail','DataTransfer-In-Bytes'), ('SendEmail','DataTransfer-Out-Bytes'), ('SendEmail', 'EC2DataTransfer-In-Bytes'))100 def draw_all(self, report):101 self.draw_request(report)102 self.draw_bytes(report)103 104filters = {105 'AmazonS3': S3,106 'AmazonSimpleDB': SDB,107 'AWSQueueService': SQS,108 'AmazonCloudFront': CloudFront,109 'AmazonSES': SES110}111class Report:112 def __init__(self, f):113 self.f = f114 self.filters = {}115 def __iter__(self):116 r = csv.reader(open(self.f), delimiter=',')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run hypothesis automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful