Best Python code snippet using localstack_python
test_docker.py
Source:test_docker.py  
...24    ],25)26LOG = logging.getLogger(__name__)27container_name_prefix = "lst_test_"28def _random_container_name() -> str:29    return f"{container_name_prefix}{short_uid()}"30@pytest.fixture31def dummy_container(create_container):32    """Returns a container that is created but not started"""33    return create_container("alpine", command=["sh", "-c", "while true; do sleep 1; done"])34@pytest.fixture35def create_container(docker_client: ContainerClient, create_network):36    """37    Uses the factory as fixture pattern to wrap ContainerClient.create_container as a factory that38    removes the containers after the fixture is cleaned up.39    Depends on create network for correct cleanup order40    """41    containers = list()42    def _create_container(image_name: str, **kwargs):43        kwargs["name"] = kwargs.get("name", _random_container_name())44        cid = docker_client.create_container(image_name, **kwargs)45        cid = cid.strip()46        containers.append(cid)47        return ContainerInfo(cid, kwargs["name"])  # FIXME name should come from docker_client48    yield _create_container49    for c in containers:50        try:51            docker_client.remove_container(c)52        except Exception:53            LOG.warning("failed to remove test container %s", c)54@pytest.fixture55def create_network():56    """57    Uses the factory as fixture pattern to wrap the creation of networks as a factory that58    removes the networks after the fixture is cleaned up.59    """60    networks = list()61    def _create_network(network_name: str):62        network_id = safe_run([config.DOCKER_CMD, "network", "create", network_name]).strip()63        networks.append(network_id)64        return network_id65    yield _create_network66    for network in networks:67        try:68            LOG.debug("Removing network %s", network)69            safe_run([config.DOCKER_CMD, "network", "remove", network])70        except CalledProcessError:71            pass72class TestDockerClient:73    def test_container_lifecycle_commands(self, docker_client: ContainerClient):74        container_name = _random_container_name()75        output = docker_client.create_container(76            "alpine",77            name=container_name,78            command=["sh", "-c", "for i in `seq 30`; do sleep 1; echo $i; done"],79        )80        container_id = output.strip()81        assert container_id82        try:83            docker_client.start_container(container_id)84            assert DockerContainerStatus.UP == docker_client.get_container_status(container_name)85            docker_client.stop_container(container_id)86            assert DockerContainerStatus.DOWN == docker_client.get_container_status(container_name)87        finally:88            docker_client.remove_container(container_id)89        assert DockerContainerStatus.NON_EXISTENT == docker_client.get_container_status(90            container_name91        )92    def test_create_container_remove_removes_container(93        self, docker_client: ContainerClient, create_container94    ):95        info = create_container("alpine", remove=True, command=["echo", "foobar"])96        # make sure it was correctly created97        assert 1 == len(docker_client.list_containers(f"id={info.container_id}"))98        # start the container99        output, _ = docker_client.start_container(info.container_id, attach=True)100        output = output.decode(config.DEFAULT_ENCODING)101        time.sleep(1)  # give the docker daemon some time to remove the container after execution102        assert 0 == len(docker_client.list_containers(f"id={info.container_id}"))103        # it takes a while for it to be removed104        assert "foobar" in output105    def test_create_container_non_existing_image(self, docker_client: ContainerClient):106        with pytest.raises(NoSuchImage):107            docker_client.create_container("this_image_does_hopefully_not_exist_42069")108    def test_exec_in_container(109        self, docker_client: ContainerClient, dummy_container: ContainerInfo110    ):111        docker_client.start_container(dummy_container.container_id)112        output, _ = docker_client.exec_in_container(113            dummy_container.container_id, command=["echo", "foobar"]114        )115        output = output.decode(config.DEFAULT_ENCODING)116        assert "foobar" == output.strip()117    def test_exec_in_container_not_running_raises_exception(118        self, docker_client: ContainerClient, dummy_container119    ):120        with pytest.raises(ContainerException):121            # can't exec into a non-running container122            docker_client.exec_in_container(123                dummy_container.container_id, command=["echo", "foobar"]124            )125    def test_exec_in_container_with_env(self, docker_client: ContainerClient, dummy_container):126        docker_client.start_container(dummy_container.container_id)127        env = {"MYVAR": "foo_var"}128        output, _ = docker_client.exec_in_container(129            dummy_container.container_id, env_vars=env, command=["env"]130        )131        output = output.decode(config.DEFAULT_ENCODING)132        assert "MYVAR=foo_var" in output133    def test_exec_error_in_container(self, docker_client: ContainerClient, dummy_container):134        docker_client.start_container(dummy_container.container_id)135        with pytest.raises(ContainerException) as ex:136            docker_client.exec_in_container(137                dummy_container.container_id, command=["./doesnotexist"]138            )139        assert ex.match("doesnotexist: no such file or directory")140    def test_create_container_with_max_env_vars(141        self, docker_client: ContainerClient, create_container142    ):143        # default ARG_MAX=131072 in Docker144        env = dict([(f"IVAR_{i:05d}", f"VAL_{i:05d}") for i in range(2000)])145        # make sure we're really triggering the relevant code146        assert len(str(dict(env))) >= Util.MAX_ENV_ARGS_LENGTH147        info = create_container("alpine", env_vars=env, command=["env"])148        output, _ = docker_client.start_container(info.container_id, attach=True)149        output = output.decode(config.DEFAULT_ENCODING)150        assert "IVAR_00001=VAL_00001" in output151        assert "IVAR_01000=VAL_01000" in output152        assert "IVAR_01999=VAL_01999" in output153    def test_run_container(self, docker_client: ContainerClient):154        container_name = _random_container_name()155        try:156            output, _ = docker_client.run_container(157                "alpine",158                name=container_name,159                command=["echo", "foobared"],160            )161            output = output.decode(config.DEFAULT_ENCODING)162            assert "foobared" in output163        finally:164            docker_client.remove_container(container_name)165    def test_run_container_error(self, docker_client: ContainerClient):166        container_name = _random_container_name()167        try:168            with pytest.raises(ContainerException):169                docker_client.run_container(170                    "alpine",171                    name=container_name,172                    command=["./doesnotexist"],173                )174        finally:175            docker_client.remove_container(container_name)176    def test_stop_non_existing_container(self, docker_client: ContainerClient):177        with pytest.raises(NoSuchContainer):178            docker_client.stop_container("this_container_does_not_exist")179    def test_remove_non_existing_container(self, docker_client: ContainerClient):180        with pytest.raises(NoSuchContainer):181            docker_client.remove_container("this_container_does_not_exist", force=False)182    def test_start_non_existing_container(self, docker_client: ContainerClient):183        with pytest.raises(NoSuchContainer):184            docker_client.start_container("this_container_does_not_exist")185    def test_get_network(self, docker_client: ContainerClient, dummy_container):186        n = docker_client.get_network(dummy_container.container_name)187        assert "default" == n188    def test_create_with_host_network(self, docker_client: ContainerClient, create_container):189        info = create_container("alpine", network="host")190        network = docker_client.get_network(info.container_name)191        assert "host" == network192    def test_create_with_port_mapping(self, docker_client: ContainerClient, create_container):193        ports = PortMappings()194        ports.add(45122, 22)195        ports.add(45180, 80)196        create_container("alpine", ports=ports)197    def test_create_with_volume(self, tmpdir, docker_client: ContainerClient, create_container):198        mount_volumes = [(tmpdir.realpath(), "/tmp/mypath")]199        c = create_container(200            "alpine",201            command=["sh", "-c", "echo 'foobar' > /tmp/mypath/foo.log"],202            mount_volumes=mount_volumes,203        )204        docker_client.start_container(c.container_id)205        assert tmpdir.join("foo.log").isfile(), "foo.log was not created in mounted dir"206    def test_copy_into_container(self, tmpdir, docker_client: ContainerClient, create_container):207        local_path = tmpdir.join("myfile.txt")208        container_path = "/tmp/myfile_differentpath.txt"209        self._test_copy_into_container(210            docker_client,211            create_container,212            ["cat", container_path],213            local_path,214            local_path,215            container_path,216        )217    def test_copy_into_non_existent_container(self, tmpdir, docker_client: ContainerClient):218        local_path = tmpdir.mkdir("test_dir")219        file_path = local_path.join("test_file")220        with file_path.open(mode="w") as fd:221            fd.write("foobared\n")222        with pytest.raises(NoSuchContainer):223            docker_client.copy_into_container(224                "hopefully_non_existent_container_%s" % short_uid(), str(file_path), "test_file"225            )226    def test_copy_into_container_without_target_filename(227        self, tmpdir, docker_client: ContainerClient, create_container228    ):229        local_path = tmpdir.join("myfile.txt")230        container_path = "/tmp"231        self._test_copy_into_container(232            docker_client,233            create_container,234            ["cat", "/tmp/myfile.txt"],235            local_path,236            local_path,237            container_path,238        )239    def test_copy_directory_into_container(240        self, tmpdir, docker_client: ContainerClient, create_container241    ):242        local_path = tmpdir.join("fancy_folder")243        local_path.mkdir()244        file_path = local_path.join("myfile.txt")245        container_path = "/tmp/fancy_other_folder"246        self._test_copy_into_container(247            docker_client,248            create_container,249            ["cat", "/tmp/fancy_other_folder/myfile.txt"],250            file_path,251            local_path,252            container_path,253        )254    def _test_copy_into_container(255        self, docker_client, create_container, command, file_path, local_path, container_path256    ):257        c = create_container("alpine", command=command)258        with file_path.open(mode="w") as fd:259            fd.write("foobared\n")260        docker_client.copy_into_container(c.container_name, str(local_path), container_path)261        output, _ = docker_client.start_container(c.container_id, attach=True)262        output = output.decode(config.DEFAULT_ENCODING)263        assert "foobared" in output264    def test_copy_into_container_with_existing_target(265        self, tmpdir, docker_client: ContainerClient, dummy_container266    ):267        local_path = tmpdir.join("myfile.txt")268        container_path = "/tmp/myfile.txt"269        with local_path.open(mode="w") as fd:270            fd.write("foo\n")271        docker_client.start_container(dummy_container.container_id)272        docker_client.exec_in_container(273            dummy_container.container_id, command=["sh", "-c", f"echo bar > {container_path}"]274        )275        out, _ = docker_client.exec_in_container(276            dummy_container.container_id,277            command=[278                "cat",279                "/tmp/myfile.txt",280            ],281        )282        assert "bar" in out.decode(config.DEFAULT_ENCODING)283        docker_client.copy_into_container(284            dummy_container.container_id, str(local_path), container_path285        )286        out, _ = docker_client.exec_in_container(287            dummy_container.container_id,288            command=[289                "cat",290                "/tmp/myfile.txt",291            ],292        )293        assert "foo" in out.decode(config.DEFAULT_ENCODING)294    def test_copy_directory_content_into_container(295        self, tmpdir, docker_client: ContainerClient, dummy_container296    ):297        local_path = tmpdir.join("fancy_folder")298        local_path.mkdir()299        file_path = local_path.join("myfile.txt")300        with file_path.open(mode="w") as fd:301            fd.write("foo\n")302        file_path = local_path.join("myfile2.txt")303        with file_path.open(mode="w") as fd:304            fd.write("bar\n")305        container_path = "/tmp/fancy_other_folder"306        docker_client.start_container(dummy_container.container_id)307        docker_client.exec_in_container(308            dummy_container.container_id, command=["mkdir", "-p", container_path]309        )310        docker_client.copy_into_container(311            dummy_container.container_id, f"{str(local_path)}/.", container_path312        )313        out, _ = docker_client.exec_in_container(314            dummy_container.container_id,315            command=[316                "cat",317                "/tmp/fancy_other_folder/myfile.txt",318                "/tmp/fancy_other_folder/myfile2.txt",319            ],320        )321        assert "foo" in out.decode(config.DEFAULT_ENCODING)322        assert "bar" in out.decode(config.DEFAULT_ENCODING)323    def test_get_network_non_existing_container(self, docker_client: ContainerClient):324        with pytest.raises(ContainerException):325            docker_client.get_network("this_container_does_not_exist")326    def test_list_containers(self, docker_client: ContainerClient, create_container):327        c1 = create_container("alpine", command=["echo", "1"])328        c2 = create_container("alpine", command=["echo", "2"])329        c3 = create_container("alpine", command=["echo", "3"])330        container_list = docker_client.list_containers()331        assert len(container_list) >= 3332        image_names = [info["name"] for info in container_list]333        assert c1.container_name in image_names334        assert c2.container_name in image_names335        assert c3.container_name in image_names336    def test_list_containers_filter_non_existing(self, docker_client: ContainerClient):337        container_list = docker_client.list_containers(filter="id=DOES_NOT_EXST")338        assert 0 == len(container_list)339    def test_list_containers_filter_illegal_filter(self, docker_client: ContainerClient):340        with pytest.raises(ContainerException):341            docker_client.list_containers(filter="illegalfilter=foobar")342    def test_list_containers_filter(self, docker_client: ContainerClient, create_container):343        name_prefix = "filter_tests_"344        cn1 = name_prefix + _random_container_name()345        cn2 = name_prefix + _random_container_name()346        cn3 = name_prefix + _random_container_name()347        c1 = create_container("alpine", name=cn1, command=["echo", "1"])348        c2 = create_container("alpine", name=cn2, command=["echo", "2"])349        c3 = create_container("alpine", name=cn3, command=["echo", "3"])350        # per id351        container_list = docker_client.list_containers(filter=f"id={c2.container_id}")352        assert 1 == len(container_list)353        assert c2.container_id.startswith(container_list[0]["id"])354        assert c2.container_name == container_list[0]["name"]355        assert "created" == container_list[0]["status"]356        # per name pattern357        container_list = docker_client.list_containers(filter=f"name={name_prefix}")358        assert 3 == len(container_list)359        image_names = [info["name"] for info in container_list]360        assert c1.container_name in image_names361        assert c2.container_name in image_names362        assert c3.container_name in image_names363        # multiple patterns364        container_list = docker_client.list_containers(365            filter=[366                f"id={c1.container_id}",367                f"name={container_name_prefix}",368            ]369        )370        assert 1 == len(container_list)371        assert c1.container_name == container_list[0]["name"]372    def test_get_container_entrypoint(self, docker_client: ContainerClient):373        entrypoint = docker_client.get_image_entrypoint("alpine")374        assert "" == entrypoint375    def test_get_container_entrypoint_non_existing_image(self, docker_client: ContainerClient):376        with pytest.raises(NoSuchImage):377            docker_client.get_image_entrypoint("thisdoesnotexist")378    def test_get_container_command(self, docker_client: ContainerClient):379        command = docker_client.get_image_cmd("alpine")380        assert "/bin/sh" == command381    def test_get_container_command_non_existing_image(self, docker_client: ContainerClient):382        with pytest.raises(NoSuchImage):383            docker_client.get_image_cmd("thisdoesnotexist")384    def test_create_start_container_with_stdin_to_stdout(self, docker_client: ContainerClient):385        container_name = _random_container_name()386        message = "test_message_stdin"387        try:388            docker_client.create_container(389                "alpine",390                name=container_name,391                interactive=True,392                command=["cat"],393            )394            output, _ = docker_client.start_container(395                container_name, interactive=True, stdin=message.encode(config.DEFAULT_ENCODING)396            )397            assert message == output.decode(config.DEFAULT_ENCODING).strip()398        finally:399            docker_client.remove_container(container_name)400            pass401    def test_create_start_container_with_stdin_to_file(402        self, tmpdir, docker_client: ContainerClient403    ):404        container_name = _random_container_name()405        message = "test_message_stdin"406        try:407            docker_client.create_container(408                "alpine",409                name=container_name,410                interactive=True,411                command=["sh", "-c", "cat > test_file"],412            )413            output, _ = docker_client.start_container(414                container_name, interactive=True, stdin=message.encode(config.DEFAULT_ENCODING)415            )416            target_path = tmpdir.join("test_file")417            docker_client.copy_from_container(container_name, str(target_path), "test_file")418            assert message == target_path.read().strip()419        finally:420            docker_client.remove_container(container_name)421    def test_run_container_with_stdin(self, docker_client: ContainerClient):422        container_name = _random_container_name()423        message = "test_message_stdin"424        try:425            output, _ = docker_client.run_container(426                "alpine",427                name=container_name,428                interactive=True,429                stdin=message.encode(config.DEFAULT_ENCODING),430                command=["cat"],431            )432            assert message == output.decode(config.DEFAULT_ENCODING).strip()433        finally:434            docker_client.remove_container(container_name)435    def test_exec_in_container_with_stdin(self, docker_client: ContainerClient, dummy_container):436        docker_client.start_container(dummy_container.container_id)437        message = "test_message_stdin"438        output, _ = docker_client.exec_in_container(439            dummy_container.container_id,440            interactive=True,441            stdin=message.encode(config.DEFAULT_ENCODING),442            command=["cat"],443        )444        assert message == output.decode(config.DEFAULT_ENCODING).strip()445    def test_exec_in_container_with_stdin_stdout_stderr(446        self, docker_client: ContainerClient, dummy_container447    ):448        docker_client.start_container(dummy_container.container_id)449        message = "test_message_stdin"450        output, stderr = docker_client.exec_in_container(451            dummy_container.container_id,452            interactive=True,453            stdin=message.encode(config.DEFAULT_ENCODING),454            command=["sh", "-c", "cat; >&2 echo stderrtest"],455        )456        assert message == output.decode(config.DEFAULT_ENCODING).strip()457        assert "stderrtest" == stderr.decode(config.DEFAULT_ENCODING).strip()458    def test_run_detached_with_logs(self, docker_client: ContainerClient):459        container_name = _random_container_name()460        message = "test_message"461        try:462            output, _ = docker_client.run_container(463                "alpine",464                name=container_name,465                detach=True,466                command=["echo", message],467            )468            container_id = output.decode(config.DEFAULT_ENCODING).strip()469            logs = docker_client.get_container_logs(container_id)470            assert message == logs.strip()471        finally:472            docker_client.remove_container(container_name)473    def test_get_logs_non_existent_container(self, docker_client: ContainerClient):...test_storage.py
Source:test_storage.py  
...34                obj.delete()35            container.delete()36    def test_containers(self):37        # make a new container38        container_name = _random_container_name()39        container = self.driver.create_container(container_name)40        self.assertEqual(container.name, container_name)41        container = self.driver.get_container(container_name)42        self.assertEqual(container.name, container_name)43        # check that an existing container can't be re-created44        with self.assertRaises(types.ContainerAlreadyExistsError):45            self.driver.create_container(container_name)46        # check that the new container can be listed47        containers = self.driver.list_containers()48        self.assertEqual([c.name for c in containers], [container_name])49        # delete the container50        self.driver.delete_container(container)51        # check that a deleted container can't be looked up52        with self.assertRaises(types.ContainerDoesNotExistError):53            self.driver.get_container(container_name)54        # check that the container is deleted55        containers = self.driver.list_containers()56        self.assertEqual([c.name for c in containers], [])57    def _test_objects(self, do_upload, do_download, size=1 * MB):58        content = os.urandom(size)59        blob_name = "testblob"60        container = self.driver.create_container(_random_container_name())61        # upload a file62        obj = do_upload(container, blob_name, content)63        self.assertEqual(obj.name, blob_name)64        obj = self.driver.get_object(container.name, blob_name)65        # check that the file can be listed66        blobs = self.driver.list_container_objects(container)67        self.assertEqual([blob.name for blob in blobs], [blob_name])68        # upload another file and check it's excluded in prefix listing69        do_upload(container, blob_name[::-1], content[::-1])70        blobs = self.driver.list_container_objects(container, ex_prefix=blob_name[0:3])71        self.assertEqual([blob.name for blob in blobs], [blob_name])72        # check that the file can be read back73        self.assertEqual(do_download(obj), content)74        # delete the file75        self.driver.delete_object(obj)76        # check that a missing file can't be deleted or looked up77        with self.assertRaises(types.ObjectDoesNotExistError):78            self.driver.delete_object(obj)79        with self.assertRaises(types.ObjectDoesNotExistError):80            self.driver.get_object(container.name, blob_name)81        # check that the file is deleted82        blobs = self.driver.list_container_objects(container)83        self.assertEqual([blob.name for blob in blobs], [blob_name[::-1]])84    def test_objects(self, size=1 * MB):85        def do_upload(container, blob_name, content):86            infile = self._create_tempfile(content=content)87            return self.driver.upload_object(infile, container, blob_name)88        def do_download(obj):89            outfile = self._create_tempfile()90            self.driver.download_object(obj, outfile, overwrite_existing=True)91            with open(outfile, "rb") as fobj:92                return fobj.read()93        self._test_objects(do_upload, do_download, size)94    def test_objects_range_downloads(self):95        blob_name = "testblob-range"96        content = b"0123456789"97        container = self.driver.create_container(_random_container_name())98        infile = self._create_tempfile(content=content)99        obj = self.driver.upload_object(infile, container, blob_name)100        self.assertEqual(obj.name, blob_name)101        self.assertEqual(obj.size, len(content))102        obj = self.driver.get_object(container.name, blob_name)103        self.assertEqual(obj.name, blob_name)104        self.assertEqual(obj.size, len(content))105        values = [106            {"start_bytes": 0, "end_bytes": 1, "expected_content": b"0"},107            {"start_bytes": 1, "end_bytes": 5, "expected_content": b"1234"},108            {"start_bytes": 5, "end_bytes": None, "expected_content": b"56789"},109            {"start_bytes": 5, "end_bytes": len(content), "expected_content": b"56789"},110            {"start_bytes": 0, "end_bytes": None, "expected_content": b"0123456789"},111            {112                "start_bytes": 0,113                "end_bytes": len(content),114                "expected_content": b"0123456789",115            },116        ]117        for value in values:118            # 1. download_object_range119            start_bytes = value["start_bytes"]120            end_bytes = value["end_bytes"]121            outfile = self._create_tempfile()122            result = self.driver.download_object_range(123                obj,124                outfile,125                start_bytes=start_bytes,126                end_bytes=end_bytes,127                overwrite_existing=True,128            )129            self.assertTrue(result)130            with open(outfile, "rb") as fobj:131                downloaded_content = fobj.read()132            if end_bytes is not None:133                expected_content = content[start_bytes:end_bytes]134            else:135                expected_content = content[start_bytes:]136            msg = 'Expected "%s", got "%s" for values: %s' % (137                expected_content,138                downloaded_content,139                str(value),140            )141            self.assertEqual(downloaded_content, expected_content, msg)142            self.assertEqual(downloaded_content, value["expected_content"], msg)143            # 2. download_object_range_as_stream144            downloaded_content = _read_stream(145                self.driver.download_object_range_as_stream(146                    obj, start_bytes=start_bytes, end_bytes=end_bytes147                )148            )149            self.assertEqual(downloaded_content, expected_content)150    @unittest.skipUnless(os.getenv("LARGE_FILE_SIZE_MB"), "config not set")151    def test_objects_large(self):152        size = int(float(os.environ["LARGE_FILE_SIZE_MB"]) * MB)153        self.test_objects(size)154    def test_objects_stream_io(self):155        def do_upload(container, blob_name, content):156            content = io.BytesIO(content)157            return self.driver.upload_object_via_stream(content, container, blob_name)158        def do_download(obj):159            return _read_stream(self.driver.download_object_as_stream(obj))160        self._test_objects(do_upload, do_download)161    def test_objects_stream_iterable(self):162        def do_upload(container, blob_name, content):163            content = iter([content[i : i + 1] for i in range(len(content))])164            return self.driver.upload_object_via_stream(content, container, blob_name)165        def do_download(obj):166            return _read_stream(self.driver.download_object_as_stream(obj))167        self._test_objects(do_upload, do_download)168    def test_upload_via_stream_with_content_encoding(self):169        object_name = "content_encoding.gz"170        content = gzip.compress(os.urandom(MB // 100))171        container = self.driver.create_container(_random_container_name())172        self.driver.upload_object_via_stream(173            iter(content), container, object_name, headers={"Content-Encoding": "gzip"}174        )175        obj = self.driver.get_object(container.name, object_name)176        self.assertEqual(obj.extra.get("content_encoding"), "gzip")177    def test_cdn_url(self):178        content = os.urandom(MB // 100)179        container = self.driver.create_container(_random_container_name())180        obj = self.driver.upload_object_via_stream(iter(content), container, "cdn")181        response = requests.get(self.driver.get_object_cdn_url(obj))182        response.raise_for_status()183        self.assertEqual(response.content, content)184    def _create_tempfile(self, prefix="", content=b""):185        fobj, path = tempfile.mkstemp(prefix=prefix, text=False)186        os.write(fobj, content)187        os.close(fobj)188        self.addCleanup(os.remove, path)189        return path190class AzureStorageTest(SmokeStorageTest):191    class Config:192        username = None193        password = None194        tenant = None195        subscription = None196        location = "eastus"197        template_file = os.path.splitext(__file__)[0] + ".arm.json"198        kind = "StorageV2"199    client = None200    resource_group_name = None201    @classmethod202    def setUpClass(cls):203        try:204            from azure.common.credentials import ServicePrincipalCredentials205            from azure.mgmt.resource import ResourceManagementClient206        except ImportError:207            raise unittest.SkipTest("missing azure-mgmt-resource library")208        cls.client = ResourceManagementClient(209            credentials=ServicePrincipalCredentials(210                client_id=cls.Config.username,211                secret=cls.Config.password,212                tenant=cls.Config.tenant,213            ),214            subscription_id=cls.Config.subscription,215        )216        cls.resource_group_name = "libcloudtest" + _random_string(length=23)217        cls.client.resource_groups.create_or_update(218            resource_group_name=cls.resource_group_name,219            parameters={"location": cls.Config.location},220        )221        with io.open(cls.Config.template_file, encoding="utf-8") as fobj:222            template = json.load(fobj)223        deployment = cls.client.deployments.create_or_update(224            resource_group_name=cls.resource_group_name,225            deployment_name=os.path.basename(__file__),226            properties={227                "template": template,228                "mode": "incremental",229                "parameters": {"storageAccountKind": {"value": cls.Config.kind}},230            },231        ).result()232        for key, value in deployment.properties.outputs.items():233            setattr(cls.Config, key.lower(), value["value"])234        cls.Config.provider = "azure_blobs"235    @classmethod236    def tearDownClass(cls):237        while True:238            groups = [239                group240                for group in cls.client.resource_groups.list()241                if group.name.startswith(cls.resource_group_name)242            ]243            if not groups:244                break245            for group in groups:246                if group.properties.provisioning_state != "Deleting":247                    try:248                        cls.client.resource_groups.delete(249                            resource_group_name=group.name, polling=False250                        )251                    except Exception:  # pylint: disable=broad-except252                        pass253            time.sleep(3)254class AzuriteStorageTest(SmokeStorageTest):255    class Config:256        port = 10000257        version = "latest"258    client = None259    container = None260    image = "arafato/azurite"261    has_sas_support = False262    @classmethod263    def setUpClass(cls):264        cls.client = _new_docker_client()265        cls.container = cls.client.containers.run(266            "{}:{}".format(cls.image, cls.Config.version),267            detach=True,268            auto_remove=True,269            ports={cls.Config.port: 10000},270            environment={"executable": "blob"},271        )272        cls.Config.account = "devstoreaccount1"273        cls.Config.secret = (274            "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uS"275            "RZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="276        )277        cls.Config.host = "localhost"278        cls.Config.secure = False279        cls.Config.provider = "azure_blobs"280        time.sleep(5)281    @classmethod282    def tearDownClass(cls):283        _kill_and_log(cls.container)284    def test_cdn_url(self):285        if not self.has_sas_support:286            self.skipTest("Storage backend has no account SAS support")287class AzuriteV3StorageTest(AzuriteStorageTest):288    image = "mcr.microsoft.com/azure-storage/azurite"289    has_sas_support = True290    def test_upload_via_stream_with_content_encoding(self):291        self.skipTest(292            "Possible bug in AzuriteV3, see https://github.com/Azure/Azurite/issues/629"293        )294class IotedgeStorageTest(SmokeStorageTest):295    class Config:296        port = 11002297        version = "latest"298    client = None299    container = None300    @classmethod301    def setUpClass(cls):302        cls.client = _new_docker_client()303        account = _random_string(10)304        key = base64.b64encode(_random_string(20).encode("ascii")).decode("ascii")305        cls.container = cls.client.containers.run(306            "mcr.microsoft.com/azure-blob-storage:{}".format(cls.Config.version),307            detach=True,308            auto_remove=True,309            ports={cls.Config.port: 11002},310            environment={311                "LOCAL_STORAGE_ACCOUNT_NAME": account,312                "LOCAL_STORAGE_ACCOUNT_KEY": key,313            },314        )315        cls.Config.account = account316        cls.Config.secret = key317        cls.Config.host = "localhost"318        cls.Config.secure = False319        cls.Config.provider = "azure_blobs"320        time.sleep(5)321    @classmethod322    def tearDownClass(cls):323        _kill_and_log(cls.container)324def _new_docker_client():325    try:326        import docker327    except ImportError:328        raise unittest.SkipTest("missing docker library")329    return docker.from_env()330def _kill_and_log(container):331    for line in container.logs().splitlines():332        print(line)333    container.kill()334def _random_container_name(prefix="test"):335    max_length = 63336    suffix = _random_string(max_length)337    name = prefix + suffix338    name = re.sub("[^a-z0-9-]", "-", name)339    name = re.sub("-+", "-", name)340    name = name[:max_length]341    name = name.lower()342    return name343def _random_string(length, alphabet=string.ascii_lowercase + string.digits):344    return "".join(random.choice(alphabet) for _ in range(length))345def _read_stream(stream):346    buffer = io.BytesIO()347    buffer.writelines(stream)348    buffer.seek(0)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
