Best Python code snippet using localstack_python
cluster.py
Source:cluster.py  
...82    tmp_dir = os.path.join(data_path, "tmp")83    data_dir = os.path.join(data_path, "data")84    backup_dir = os.path.join(data_path, "backup")85    return Directories(install_dir, tmp_dir, modules_dir, data_dir, backup_dir)86def build_cluster_run_command(cluster_bin: str, settings: CommandSettings) -> List[str]:87    """88    Takes the command settings dict and builds the actual command (which can then be executed as a shell command).89    :param cluster_bin: path to the OpenSearch/Elasticsearch binary (including the binary)90    :param settings: dictionary where each item will be set as a command arguments91    :return: list of strings for the command with the settings to be executed as a shell command92    """93    cmd_settings = [f"-E {k}={v}" for k, v, in settings.items()]94    return [cluster_bin] + cmd_settings95class OpensearchCluster(Server):96    """Manages an OpenSearch cluster which is installed an operated by LocalStack."""97    # TODO: legacy default port should be removed here98    def __init__(99        self, port=4571, host="localhost", version: str = None, directories: Directories = None100    ) -> None:101        super().__init__(port, host)102        self._version = version or self.default_version103        self.command_settings = {}104        self.directories = directories or self._resolve_directories()105    @property106    def default_version(self) -> str:107        return constants.OPENSEARCH_DEFAULT_VERSION108    @property109    def version(self) -> str:110        return self._version111    @property112    def install_version(self) -> str:113        _, install_version = versions.get_install_type_and_version(self._version)114        return install_version115    @property116    def bin_name(self) -> str:117        return "opensearch"118    @property119    def os_user(self):120        return constants.OS_USER_OPENSEARCH121    def health(self) -> Optional[str]:122        return get_cluster_health_status(self.url)123    def do_start_thread(self) -> FuncThread:124        self._ensure_installed()125        self._init_directories()126        cmd = self._create_run_command(additional_settings=self.command_settings)127        cmd = " ".join(cmd)128        if is_root() and self.os_user:129            # run the opensearch process as a non-root user (when running in docker)130            cmd = f"su {self.os_user} -c '{cmd}'"131        env_vars = self._create_env_vars()132        LOG.info("starting %s: %s with env %s", self.bin_name, cmd, env_vars)133        t = ShellCommandThread(134            cmd,135            env_vars=env_vars,136            strip_color=True,137            log_listener=self._log_listener,138        )139        t.start()140        return t141    def _resolve_directories(self) -> Directories:142        return resolve_directories(version=self.version, cluster_path=self.version)143    def _ensure_installed(self):144        install.install_opensearch(self.version)145    def _init_directories(self):146        init_directories(self.directories)147    def _base_settings(self, dirs) -> CommandSettings:148        settings = {149            "http.port": self.port,150            "http.publish_port": self.port,151            "transport.port": "0",152            "network.host": self.host,153            "http.compression": "false",154            "path.data": f'"{dirs.data}"',155            "path.repo": f'"{dirs.backup}"',156            "plugins.security.disabled": "true",157        }158        if os.path.exists(os.path.join(dirs.mods, "x-pack-ml")):159            settings["xpack.ml.enabled"] = "false"160        return settings161    def _create_run_command(162        self, additional_settings: Optional[CommandSettings] = None163    ) -> List[str]:164        # delete opensearch data that may be cached locally from a previous test run165        dirs = self.directories166        bin_path = os.path.join(self.directories.install, "bin", self.bin_name)167        settings = self._base_settings(dirs)168        if additional_settings:169            settings.update(additional_settings)170        cmd = build_cluster_run_command(bin_path, settings)171        return cmd172    def _create_env_vars(self) -> Dict:173        return {174            "OPENSEARCH_JAVA_OPTS": os.environ.get("OPENSEARCH_JAVA_OPTS", "-Xms200m -Xmx600m"),175            "OPENSEARCH_TMPDIR": self.directories.tmp,176        }177    def _log_listener(self, line, **_kwargs):178        # logging the port before each line to be able to connect logs to specific instances179        LOG.info("[%s] %s", self.port, line.rstrip())180class CustomEndpoint:181    """182    Encapsulates a custom endpoint (combines CustomEndpoint and CustomEndpointEnabled within the DomainEndpointOptions183    of the cluster, i.e. combines two fields from the AWS OpenSearch service model).184    """...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
