How to use exec_in_container method in testcontainers-python

Best Python code snippet using testcontainers-python_python

test_nginx.py

Source:test_nginx.py Github

copy

Full Screen

1import http.server2import logging3import os4from pathlib import Path5import shutil6import socket7import stat8import subprocess9from threading import Thread10import time11import pytest12import requests13from .util import *14def port():15 """Get an ephemeral port number."""16 # https://unix.stackexchange.com/a/13252417 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)18 s.bind(('', 0))19 addr = s.getsockname()20 ephemeral_port = addr[1]21 s.close()22 return ephemeral_port23config = """24listen 0.0.0.0:{nginx_port};25location /auth {{26 internal;27 proxy_pass http://127.0.0.1:{app_port}/;28 proxy_connect_timeout {timeout}s;29 proxy_read_timeout {timeout}s;30 proxy_send_timeout {timeout}s;31 proxy_set_header Content-Length "";32 proxy_set_header X-Original-URI $request_uri;33 proxy_set_header X-Original-Method $request_method;34}}35"""36config_health = """37server {{38 listen 0.0.0.0:{health_port};39 location = /basic_status {{40 stub_status;41 }}42}}43"""44@pytest.fixture(scope="session")45def fake_server():46 class Handler(http.server.BaseHTTPRequestHandler):47 def do_GET(self):48 self.wfile.write(b'HTTP/1.0 200 OK\r\n')49 self.send_header('REMOTE_USER', 'user')50 self.send_header('X_UID', '123')51 self.send_header('X_GID', '456')52 self.send_header('X_GROUPS', '456,789')53 self.end_headers()54 self.wfile.write(b'\r\n')55 app_port = port()56 s = http.server.HTTPServer(('', app_port), Handler)57 t = Thread(target=s.serve_forever, daemon=True)58 t.start()59 yield app_port60@pytest.fixture(scope="session")61def nginx(tmp_path_factory, fake_server):62 workdir = Path(__file__).parent.parent63 subprocess.run(['docker', 'build', '-t', 'wipac/keycloak-http-auth:testing',64 '-f', 'Dockerfile_nginx', f'{workdir}'], check=True, cwd=workdir)65 test_volume = tmp_path_factory.mktemp('cache')66 nginx_config = tmp_path_factory.mktemp('conf') / 'config.conf'67 nginx_health_config = tmp_path_factory.mktemp('conf') / 'health.conf'68 nginx_port = port()69 health_port = port()70 app_port = fake_server71 nginx_config.write_text(config.format(nginx_port=nginx_port, app_port=app_port, timeout=1))72 nginx_health_config.write_text(config_health.format(health_port=health_port))73 with subprocess.Popen(['docker', 'run', '--rm', '--network=host', '--name', 'test_nginx_integration',74 '-v', f'{nginx_config}:/etc/nginx/custom/webdav.conf:ro',75 '-v', f'{nginx_health_config}:/etc/nginx/sites-enabled/health.conf:ro',76 '-v', f'{test_volume}:/mnt/data:rw', 'wipac/keycloak-http-auth:testing']) as p:77 # wait for server to come up78 for i in range(10):79 try:80 socket.create_connection(('localhost', nginx_port), 0.1)81 except ConnectionRefusedError:82 if i >= 9:83 raise84 else:85 time.sleep(.1)86 def fn(*args):87 return subprocess.run(['docker', 'exec', 'test_nginx_integration']+list(args), check=True, cwd=workdir, capture_output=True).stdout88 try:89 yield {90 'data': test_volume,91 'nginx_port': nginx_port,92 'health_port': health_port,93 'app_port': app_port,94 'exec_in_container': fn,95 }96 finally:97 p.terminate()98@pytest.fixture(autouse=True, scope='function')99def clear_test_volume(nginx):100 nginx["data"].chmod(0o777)101 try:102 uid, gid = (int(x) for x in nginx['exec_in_container']('stat','-c','%u %g','/mnt/data').split())103 logging.info(f'uid:gid starts as {uid}:{gid}')104 yield105 finally:106 logging.info('cleaning dir')107 nginx['exec_in_container']('chown', '-R', f'{uid}:{gid}', '/mnt/data')108 for child in nginx['data'].iterdir():109 if child.is_dir():110 shutil.rmtree(child)111 else:112 child.unlink()113 logging.debug('ls: %r', nginx['exec_in_container']('ls','-al','/mnt/data'))114def test_health(nginx):115 r = requests.get(f'http://localhost:{nginx["health_port"]}/basic_status')116 assert r.status_code == 200117 assert 'Active connections' in r.text118def test_index(nginx):119 r = requests.get(f'http://localhost:{nginx["nginx_port"]}/')120 assert r.status_code == 200121def test_missing(nginx):122 r = requests.get(f'http://localhost:{nginx["nginx_port"]}/missing')123 assert r.status_code == 404124def test_read(nginx):125 data = b'foo bar baz'126 nginx["data"].chmod(0o777)127 (nginx["data"] / "test").write_bytes(data)128 r = requests.get(f'http://localhost:{nginx["nginx_port"]}/data/test')129 r.raise_for_status()130 assert r.content == data131def test_read_bad_perms(nginx):132 data = b'foo bar baz'133 nginx["data"].chmod(0o777)134 (nginx["data"] / "test").write_bytes(data)135 (nginx["data"] / "test").chmod(0o600)136 r = requests.get(f'http://localhost:{nginx["nginx_port"]}/data/test')137 with pytest.raises(Exception):138 r.raise_for_status()139def test_read_group(nginx):140 data = b'foo bar baz'141 nginx["data"].chmod(0o777)142 (nginx["data"] / "test").write_bytes(data)143 (nginx["data"] / "test").chmod(0o660)144 nginx['exec_in_container']('chgrp','789','/mnt/data/test')145 r = requests.get(f'http://localhost:{nginx["nginx_port"]}/data/test')146 r.raise_for_status()147 assert r.content == data148def test_read_bad_group(nginx):149 data = b'foo bar baz'150 nginx["data"].chmod(0o777)151 (nginx["data"] / "test").write_bytes(data)152 (nginx["data"] / "test").chmod(0o660)153 nginx['exec_in_container']('chgrp','12345','/mnt/data/test')154 r = requests.get(f'http://localhost:{nginx["nginx_port"]}/data/test')155 with pytest.raises(Exception):156 r.raise_for_status()157def test_write(nginx):158 data = b'foo bar baz'159 nginx["data"].chmod(0o777)160 r = requests.put(f'http://localhost:{nginx["nginx_port"]}/data/test', data=data)161 r.raise_for_status()162 out = nginx['exec_in_container']('cat', '/mnt/data/test')163 assert out == data164 out = nginx['exec_in_container']('stat','-c','%u %g','/mnt/data/test')165 assert out.strip() == b'123 456'166def test_write_group(nginx):167 data = b'foo bar baz'168 nginx["data"].chmod(0o770)169 nginx['exec_in_container']('chgrp','789','/mnt/data')170 out = nginx['exec_in_container']('stat','-c','%A','/mnt/data')171 assert out.strip() == b'drwxrwx---'172 r = requests.put(f'http://localhost:{nginx["nginx_port"]}/data/test', data=data)173 r.raise_for_status()174 out = nginx['exec_in_container']('cat', '/mnt/data/test')175 assert out == data176 out = nginx['exec_in_container']('stat','-c','%A %u %g','/mnt/data/test')177 assert out.strip() == b'-rw-rw---- 123 456'178def test_write_group_sticky(nginx):179 data = b'foo bar baz'180 nginx["data"].chmod(0o770)181 nginx['exec_in_container']('chgrp','789','/mnt/data')182 nginx['exec_in_container']('chmod','g+s','/mnt/data')183 out = nginx['exec_in_container']('stat','-c','%A','/mnt/data')184 assert out.strip() == b'drwxrws---'185 r = requests.put(f'http://localhost:{nginx["nginx_port"]}/data/test', data=data)186 r.raise_for_status()187 out = nginx['exec_in_container']('cat', '/mnt/data/test')188 assert out == data189 out = nginx['exec_in_container']('stat','-c','%A %u %g','/mnt/data/test')190 assert out.strip() == b'-rw-rw---- 123 789'191def test_write_subdir(nginx):192 data = b'foo bar baz'193 nginx["data"].chmod(0o770)194 nginx['exec_in_container']('chgrp','789','/mnt/data')195 nginx['exec_in_container']('chmod','g+s','/mnt/data')196 out = nginx['exec_in_container']('stat','-c','%A','/mnt/data')197 assert out.strip() == b'drwxrws---'198 r = requests.put(f'http://localhost:{nginx["nginx_port"]}/data/sub1/sub2/test', data=data)199 r.raise_for_status()200 out = nginx['exec_in_container']('cat', '/mnt/data/sub1/sub2/test')201 assert out == data202 out = nginx['exec_in_container']('stat','-c','%A %u %g','/mnt/data/sub1/sub2/test')203 assert out.strip() == b'-rw-rw---- 123 789'204def test_write_bad_group(nginx):205 data = b'foo bar baz'206 nginx["data"].chmod(0o770)207 nginx['exec_in_container']('chown','12345:12345','/mnt/data')208 out = nginx['exec_in_container']('stat','-c','%A','/mnt/data')209 assert out.strip() == b'drwxrwx---'210 r = requests.put(f'http://localhost:{nginx["nginx_port"]}/data/test', data=data)211 with pytest.raises(Exception):212 r.raise_for_status()213 with pytest.raises(Exception):214 out = nginx['exec_in_container']('stat','/mnt/data/test')215 logging.warning('stat output: %r', out)216 #f = (nginx["data"] / "test")217 #assert not f.exists()218def test_write_bad_perms(nginx):219 data = b'foo bar baz'220 nginx["data"].chmod(0o555)221 r = requests.put(f'http://localhost:{nginx["nginx_port"]}/data/test', data=data)222 with pytest.raises(Exception):223 r.raise_for_status()224 f = (nginx["data"] / "test")225 assert not f.exists()226def test_delete(nginx):227 data = b'foo bar baz'228 nginx["data"].chmod(0o777)229 r = requests.put(f'http://localhost:{nginx["nginx_port"]}/data/test', data=data)230 r.raise_for_status()231 r = requests.delete(f'http://localhost:{nginx["nginx_port"]}/data/test')232 r.raise_for_status()233 f = (nginx["data"] / "test")234 assert not f.exists()235def test_mkdir(nginx):236 data = b'foo bar baz'237 nginx["data"].chmod(0o777)238 r = requests.request('MKCOL', f'http://localhost:{nginx["nginx_port"]}/data/test/')239 r.raise_for_status()240 f = (nginx["data"] / "test")241 assert f.is_dir()242 out = nginx['exec_in_container']('stat','-c','%u %g','/mnt/data/test')...

Full Screen

Full Screen

daisyHatUsbAttach.py

Source:daisyHatUsbAttach.py Github

copy

Full Screen

...6container_name = "daisyhat-container"7def log(message):8 print(message)9 syslog.syslog(message)10def exec_in_container(cmd):11 return subprocess.check_output(12 "docker exec {} bash -c '{}'".format(container_name, cmd),13 universal_newlines=True,14 shell=True15 )16root_dev = "/dev/bus/usb"17for bus_num in os.listdir(root_dev):18 bus_path = os.path.join(root_dev, bus_num)19 container_devices = exec_in_container("ls " + bus_path).split("\n")20 host_devices = os.listdir(bus_path)21 # remove dead links22 for device in container_devices:23 if device and device not in host_devices:24 exec_in_container("rm " + os.path.join(bus_path, device))25 # add new links26 for device in host_devices:27 if device not in container_devices:28 stat = os.stat(os.path.join(bus_path, device))29 cmd = "mknod {device} c {major} {minor}".format(30 device=os.path.join(bus_path, device),31 major=os.major(stat.st_rdev),32 minor=os.minor(stat.st_rdev)33 )...

Full Screen

Full Screen

docker-usb-sync.py

Source:docker-usb-sync.py Github

copy

Full Screen

...4import os5def log(message):6 print(message)7 syslog.syslog(message)8def exec_in_container(cmd, name="container_name"):9 return subprocess.check_output(10 "docker exec {} bash -c '{}'".format(name, cmd),11 universal_newlines=True,12 shell=True13 )14root_dev = "/dev/bus/usb"15for bus_num in os.listdir(root_dev):16 bus_path = os.path.join(root_dev, bus_num)17 container_devices = exec_in_container("ls " + bus_path).split("\n")18 host_devices = os.listdir(bus_path)19 # remove dead links20 for device in container_devices:21 if device and device not in host_devices:22 exec_in_container("rm " + os.path.join(bus_path, device))23 # add new links24 for device in host_devices:25 if device not in container_devices:26 stat = os.stat(os.path.join(bus_path, device))27 cmd = "mknod {device} c {major} {minor}".format(28 device=os.path.join(bus_path, device),29 major=os.major(stat.st_rdev),30 minor=os.minor(stat.st_rdev)31 )...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run testcontainers-python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful