Best Python code snippet using localstack_python
opensearchTools.py
Source:opensearchTools.py  
1'''2Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.3Licensed under the Apache License, Version 2.0 (the "License");4you may not use this file except in compliance with the License.5You may obtain a copy of the License at6    http://www.apache.org/licenses/LICENSE-2.07Unless required by applicable law or agreed to in writing, software8distributed under the License is distributed on an "AS IS" BASIS,9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.10See the License for the specific language governing permissions and11limitations under the License.12'''13import json14import logging15import os16import re17import boto318import requests19from crhelper import CfnResource20from requests_aws4auth import AWS4Auth21logger = logging.getLogger(__name__)22helper_config = CfnResource(json_logging=False,23                            log_level='DEBUG',24                            boto_level='CRITICAL',25                            sleep_on_delete=120)26region = os.environ['AWS_REGION']27KIBANA_HEADERS = {'Content-Type': 'application/json', 'kbn-xsrf': 'true'}28OPENSEARCH_DASHBOARD_HEADERS = {'Content-Type': 'application/json', 'osd-xsrf': 'true'}29def auth_opensearch(opensearch_endpoint):30    service = 'es'31    credentials = boto3.Session().get_credentials()32    awsauth = AWS4Auth(credentials.access_key,33                       credentials.secret_key,34                       region,35                       service,36                       session_token=credentials.token)37    return awsauth38def output_message(key, res):39    return (f'{key}: status={res.status_code}, message={res.text}')40def query_opensearch(opensearch_endpoint,41                     awsauth,42                     method=None,43                     path=None,44                     payload=None,45                     headers=None):46    if not headers:47        headers = {'Content-Type': 'application/json'}48    url = 'https://' + opensearch_endpoint + '/' + path49    if method.lower() == 'get':50        res = requests.get(url, auth=awsauth, stream=True)51    elif method.lower() == 'post':52        res = requests.post(url, auth=awsauth, json=payload, headers=headers)53    elif method.lower() == 'put':54        res = requests.put(url, auth=awsauth, json=payload, headers=headers)55    elif method.lower() == 'patch':56        res = requests.put(url, auth=awsauth, json=payload, headers=headers)57    elif method.lower() == 'head':58        res = requests.head(url, auth=awsauth, json=payload, headers=headers)59    return (res)60def set_tenant_get_cookies(engineType, opensearch_endpoint, tenant, auth):61    if engineType == 'OpenSearch':62        base_url = f'https://{opensearch_endpoint}/_dashboards'63        headers = OPENSEARCH_DASHBOARD_HEADERS64    else:65        base_url = f'https://{opensearch_endpoint}/_plugin/kibana'66        headers = KIBANA_HEADERS67    if isinstance(auth, dict):68        url = f'{base_url}/auth/login?security_tenant={tenant}'69        response = requests.post(url,70                                 headers=headers,71                                 json=json.dumps(auth))72    elif isinstance(auth, AWS4Auth):73        url = f'{base_url}/app/dashboards?security_tenant={tenant}'74        response = requests.get(url, headers=headers, auth=auth)75    else:76        logger.error('There is no valid authentication')77        return False78    if response.status_code in (200, ):79        logger.info('Authentication success to access kibana')80        return response.cookies81    else:82        print(response.cookies)83        logger.error("Authentication failed to access kibana")84        logger.error(response.reason)85        return False86def configure_aes_total_fields_limit(opensearch_endpoint, indexName, limit_number,87                                     awsauth):88    payload = {"index.mapping.total_fields.limit": limit_number}89    path = str(indexName) + '/_settings'90    res = query_opensearch(opensearch_endpoint, awsauth, 'PUT', path, payload)91    if res.status_code == 200:92        logger.info("Change total_fields limit for index: %s to %d." %93                    (indexName, limit_number))94        return 'success'95    else:96        logger.error(output_message(path, res))97        return 'fail'98@helper_config.create99@helper_config.update100def update_total_fields_limit(opensearch_endpoint, indexName, limit_number):101    awsauth = auth_opensearch(opensearch_endpoint)102    state = configure_aes_total_fields_limit(opensearch_endpoint, indexName,103                                             limit_number, awsauth)104    return state105@helper_config.create106@helper_config.update107def index_exists(opensearch_endpoint, indexName):108    awsauth = auth_opensearch(opensearch_endpoint)109    path = str(indexName)110    res = query_opensearch(opensearch_endpoint, awsauth, 'HEAD', path)111    if res.status_code == 200:112        logger.info("Index name: %s exists." % (indexName))113        return 'success'114    else:115        logger.info("Index name: %s doesn't exist." % (indexName))116        return 'fail'117@helper_config.create118@helper_config.update119def create_index(opensearch_endpoint, indexName):120    awsauth = auth_opensearch(opensearch_endpoint)121    payload = {122        "settings": {123            "index.mapping.ignore_malformed": "true"124        }125    }126    path = str(indexName)127    res = query_opensearch(opensearch_endpoint, awsauth, 'PUT', path, payload)128    if res.status_code == 200:129        logger.info("Create Index %s." % (indexName))...opensearch_index_id.py
Source:opensearch_index_id.py  
1from __future__ import annotations2class OpensearchIndexId:3    """4    Build OpenSearch Index Id using given endpoint and index name or resolve the index name from given resource Id.5    """6    def __init__(self, opensearch_endpoint: str, index_name: str) -> None:7        self.opensearch_endpoint = opensearch_endpoint8        self.index_name = index_name9    def make_resource_id(self):10        """11        Make resource id of OpenSearch index by concatenating given endpoint and index name.12        OpenSearch endpoint and index name concatenated using delimiter '||'.13        :param opensearch_domain: OpenSearch domain endpoint.14        :param index_name: Index name.15        :return: Resource id of OpenSearch index.16        """17        return f'{self.opensearch_endpoint}||{self.index_name}'18    @staticmethod19    def resource_id(resource_id: str) -> OpensearchIndexId:20        """21        Split given resource_id using delimiter '||' and initialize a class.22        :param resource_id: OpenSearch index resource id e.g. opensearch.eu-central-1.es.amazonaws.com||posts-3qs1999pg-c23        :return: OpensearchIndexId class instance.24        """...opensearch_client.py
Source:opensearch_client.py  
1import pytest2from b_aws_testing_framework.credentials import Credentials3from b_cfn_opensearch_index_layer.opensearch_client import OpensearchClient4from b_cfn_opensearch_index_tests.integration.infrastructure.main_stack import MainStack5@pytest.fixture(scope='session')6def opensearch_client() -> OpensearchClient:7    opensearch_endpoint = MainStack.get_output(MainStack.OPENSEARCH_DOMAIN_ENDPOINT)8    return OpensearchClient(9        boto3_session=Credentials().boto_session,10        opensearch_endpoint=opensearch_endpoint11    )12__all__ = [13    'opensearch_client'...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
