Best Python code snippet using devtools-proxy_python
test_socket.py
Source:test_socket.py
1"""Tests to exercise socket-based read and write operations in the Tracker."""2import os3import subprocess4import sys5import textwrap6import time7from contextlib import contextmanager8from pathlib import Path9from typing import Iterator10from typing import Set11from typing import Tuple12import pytest13from memray import AllocatorType14from memray import SocketReader15from tests.utils import filter_relevant_allocations16TIMEOUT = 517ALLOCATION_SIZE = 123418MULTI_ALLOCATION_COUNT = 1019#20# Test helpers21#22_SCRIPT_TEMPLATE = """23import sys24from memray._test import MemoryAllocator25from memray._memray import SocketDestination26from memray._memray import Tracker27# Sanity checks28port = int(sys.argv[1])29assert sys.argv[2].endswith("allocations_made.event")30assert sys.argv[3].endswith("snapshot_taken.event")31def get_tracker():32 return Tracker(destination=SocketDestination(server_port=port))33def snapshot_point():34 print("[child] Notifying allocations made")35 with open(sys.argv[2], "w") as allocations_made:36 allocations_made.write("done")37 print("[child] Waiting on snapshot taken")38 with open(sys.argv[3], "r") as snapshot_taken:39 response = snapshot_taken.read()40 assert response == "done"41 print("[child] Continuing execution")42{body}43"""44ALLOCATE_THEN_FREE_THEN_SNAPSHOT = textwrap.dedent(45 f"""46 allocator = MemoryAllocator()47 with get_tracker():48 allocator.valloc({ALLOCATION_SIZE})49 allocator.free()50 snapshot_point()51 """52)53ALLOCATE_THEN_SNAPSHOT_THEN_FREE = textwrap.dedent(54 f"""55 allocator = MemoryAllocator()56 with get_tracker():57 allocator.valloc({ALLOCATION_SIZE})58 snapshot_point()59 allocator.free()60 """61)62ALLOCATE_MANY_THEN_SNAPSHOT_THEN_FREE_MANY = textwrap.dedent(63 f"""64 allocators = [MemoryAllocator() for _ in range({MULTI_ALLOCATION_COUNT})]65 with get_tracker():66 for allocator in allocators:67 allocator.valloc({ALLOCATION_SIZE})68 snapshot_point()69 for allocator in allocators:70 allocator.free()71 """72)73@contextmanager74def run_till_snapshot_point(75 program: str,76 *,77 reader: SocketReader,78 tmp_path: Path,79 free_port: int,80) -> Iterator[None]:81 allocations_made = tmp_path / "allocations_made.event"82 snapshot_taken = tmp_path / "snapshot_taken.event"83 os.mkfifo(allocations_made)84 os.mkfifo(snapshot_taken)85 script = _SCRIPT_TEMPLATE.format(body=program)86 proc = subprocess.Popen(87 [88 sys.executable,89 "-c",90 script,91 str(free_port),92 allocations_made,93 snapshot_taken,94 ]95 )96 try:97 with reader:98 print("[parent] Waiting on allocations made")99 with open(allocations_made, "r") as f1:100 assert f1.read() == "done"101 print("[parent] Deferring to caller")102 # Wait a bit of time, for background thread to receive + process the records.103 time.sleep(0.1)104 yield105 print("[parent] Notifying program to continue")106 with open(snapshot_taken, "w") as f2:107 f2.write("done")108 print("[parent] Will close socket reader now.")109 finally:110 print("[parent] Waiting on child to exit.")111 try:112 assert proc.wait(timeout=TIMEOUT) == 0113 except subprocess.TimeoutExpired:114 print("[parent] Killing child, after timeout.")115 proc.kill()116 raise117#118# Actual tests119#120class TestSocketReaderErrorHandling:121 @pytest.mark.valgrind122 def test_get_current_snapshot_raises_before_context(self, free_port: int) -> None:123 # GIVEN124 reader = SocketReader(port=free_port)125 # WHEN / THEN126 with pytest.raises(StopIteration):127 next(reader.get_current_snapshot(merge_threads=False))128 def test_get_is_active_after_context(self, free_port: int, tmp_path: Path) -> None:129 # GIVEN130 reader = SocketReader(port=free_port)131 program = ALLOCATE_THEN_SNAPSHOT_THEN_FREE132 # WHEN133 with run_till_snapshot_point(134 program,135 reader=reader,136 tmp_path=tmp_path,137 free_port=free_port,138 ):139 pass140 # THEN141 assert reader.is_active is False142 @pytest.mark.valgrind143 def test_get_current_snapshot_raises_after_context(144 self, free_port: int, tmp_path: Path145 ) -> None:146 # GIVEN147 reader = SocketReader(port=free_port)148 program = ALLOCATE_THEN_SNAPSHOT_THEN_FREE149 # WHEN150 with run_till_snapshot_point(151 program,152 reader=reader,153 tmp_path=tmp_path,154 free_port=free_port,155 ):156 pass157 # THEN158 with pytest.raises(StopIteration):159 next(reader.get_current_snapshot(merge_threads=False))160 @pytest.mark.valgrind161 def test_get_current_snapshot_first_yield_after_context_raises(162 self, free_port: int, tmp_path: Path163 ) -> None:164 # GIVEN165 reader = SocketReader(port=free_port)166 program = ALLOCATE_THEN_SNAPSHOT_THEN_FREE167 # WHEN168 with run_till_snapshot_point(169 program,170 reader=reader,171 tmp_path=tmp_path,172 free_port=free_port,173 ):174 snapshot = reader.get_current_snapshot(merge_threads=False)175 # THEN176 with pytest.raises(StopIteration):177 next(snapshot)178 @pytest.mark.valgrind179 def test_nested_context_is_diallowed(self, free_port: int, tmp_path: Path) -> None:180 # GIVEN181 reader = SocketReader(port=free_port)182 program = ALLOCATE_THEN_FREE_THEN_SNAPSHOT183 # WHEN184 with run_till_snapshot_point(185 program,186 reader=reader,187 tmp_path=tmp_path,188 free_port=free_port,189 ):190 # THEN191 with pytest.raises(192 ValueError, match="Can not enter (.*)context (.*)more than once"193 ):194 with reader:195 pass196class TestSocketReaderAccess:197 @pytest.mark.valgrind198 def test_empty_snapshot_after_free(self, free_port: int, tmp_path: Path) -> None:199 # GIVEN200 reader = SocketReader(port=free_port)201 program = ALLOCATE_THEN_FREE_THEN_SNAPSHOT202 # WHEN203 with run_till_snapshot_point(204 program,205 reader=reader,206 tmp_path=tmp_path,207 free_port=free_port,208 ):209 unfiltered_snapshot = list(reader.get_current_snapshot(merge_threads=False))210 # THEN211 snapshot = list(filter_relevant_allocations(unfiltered_snapshot))212 assert snapshot == []213 @pytest.mark.valgrind214 def test_single_allocation_snapshot(self, free_port: int, tmp_path: Path) -> None:215 # GIVEN216 reader = SocketReader(port=free_port)217 program = ALLOCATE_THEN_SNAPSHOT_THEN_FREE218 # WHEN219 with run_till_snapshot_point(220 program,221 reader=reader,222 tmp_path=tmp_path,223 free_port=free_port,224 ):225 unfiltered_snapshot = list(reader.get_current_snapshot(merge_threads=False))226 # THEN227 snapshot = list(filter_relevant_allocations(unfiltered_snapshot))228 assert len(snapshot) == 1229 allocation = snapshot[0]230 assert allocation.size == ALLOCATION_SIZE * 1231 assert allocation.allocator == AllocatorType.VALLOC232 symbol, filename, lineno = allocation.stack_trace()[0]233 assert symbol == "valloc"234 assert filename == "src/memray/_memray_test_utils.pyx"235 assert 0 < lineno < 200236 @pytest.mark.valgrind237 def test_multi_allocation_snapshot(self, free_port: int, tmp_path: Path) -> None:238 # GIVEN239 reader = SocketReader(port=free_port)240 program = ALLOCATE_MANY_THEN_SNAPSHOT_THEN_FREE_MANY241 # WHEN242 with run_till_snapshot_point(243 program,244 reader=reader,245 tmp_path=tmp_path,246 free_port=free_port,247 ):248 unfiltered_snapshot = list(reader.get_current_snapshot(merge_threads=False))249 # THEN250 snapshot = list(filter_relevant_allocations(unfiltered_snapshot))251 assert len(snapshot) == 1252 allocation = snapshot[0]253 assert allocation.size == ALLOCATION_SIZE * 10254 assert allocation.allocator == AllocatorType.VALLOC255 symbol, filename, lineno = allocation.stack_trace()[0]256 assert symbol == "valloc"257 assert filename == "src/memray/_memray_test_utils.pyx"258 assert 0 < lineno < 200259 @pytest.mark.valgrind260 def test_multiple_context_entries_does_not_crash(261 self, free_port: int, tmp_path: Path262 ) -> None:263 # GIVEN264 reader = SocketReader(port=free_port)265 program = ALLOCATE_THEN_FREE_THEN_SNAPSHOT266 tmp_path_one = tmp_path / "one"267 tmp_path_one.mkdir()268 tmp_path_two = tmp_path / "two"269 tmp_path_two.mkdir()270 # WHEN271 with run_till_snapshot_point(272 program,273 reader=reader,274 tmp_path=tmp_path_one,275 free_port=free_port,276 ):277 pass278 # THEN279 with run_till_snapshot_point(280 program,281 reader=reader,282 tmp_path=tmp_path_two,283 free_port=free_port,284 ):285 pass286 @pytest.mark.valgrind287 def test_command_line(self, free_port: int, tmp_path: Path) -> None:288 # GIVEN289 reader = SocketReader(port=free_port)290 program = ALLOCATE_THEN_FREE_THEN_SNAPSHOT291 # WHEN292 with run_till_snapshot_point(293 program,294 reader=reader,295 tmp_path=tmp_path,296 free_port=free_port,297 ):298 command_line = reader.command_line299 # THEN300 assert command_line301 assert command_line.startswith("-c") # these samples run with `python -c`302 assert str(free_port) in command_line303 @pytest.mark.valgrind304 def test_reading_allocations_while_reading_stack_traces(305 self, free_port: int, tmp_path: Path306 ) -> None:307 """This test exist mainly to give valgrind/helgrind the opportunity to308 see races between fetching stack traces and the background thread309 fetching allocations. Therefore no interesting asserts are done in310 the test itself.311 The test will spawn a tracked process that is constantly creating new312 functions and code objects so the process is constantly sending new313 frames to the SocketReader, triggering tons of updates of the internal314 structures inside. We want to check that if we get the stack traces315 inside the allocations it won't crash/race with fetching new allocations316 """317 # GIVEN318 script = textwrap.dedent(319 f"""\320 from memray._test import MemoryAllocator321 from memray._memray import SocketDestination322 from memray._memray import Tracker323 from itertools import count324 import textwrap325 def foo(allocator, n):326 fname = f"blech_{{n}}"327 namespace = {{}}328 exec(textwrap.dedent(f'''329 def {{fname}}(allocator):330 allocator.valloc(1024)331 allocator.free()332 '''), namespace, namespace)333 func = namespace[fname]334 func(allocator)335 with Tracker(destination=SocketDestination(server_port={free_port})):336 allocator = MemoryAllocator()337 for n in count(0):338 foo(allocator, n)339 """340 )341 code = [342 sys.executable,343 "-c",344 script,345 ]346 traces: Set[Tuple[str, str, int]] = set()347 MAX_TRACES = 10348 # WHEN349 with subprocess.Popen(code) as proc, SocketReader(port=free_port) as reader:350 while len(traces) < MAX_TRACES:351 for allocation in reader.get_current_snapshot(merge_threads=False):352 traces.update(allocation.stack_trace())353 proc.terminate()354 # THEN355 assert len(traces) >= MAX_TRACES...
test_file.py
Source:test_file.py
...65 handler = functools.partial(TestRequestHandler, directory=directory)66 s = http.server.HTTPServer(("127.0.0.1", port), handler)67 s.serve_forever()68@pytest.fixture(scope="session")69def free_port():70 """71 Utility fixture to grab a free port for the web server72 """73 with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:74 s.bind(("", 0))75 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)76 return s.getsockname()[1]77@pytest.fixture(autouse=True, scope="session")78def server(free_port, web_root):79 """80 Web server fixture81 """82 p = multiprocessing.Process(target=serve, args=(free_port, web_root))83 p.start()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!