How to use es_client method in localstack

Best Python code snippet using localstack_python

indexManagementClient.py

Source:indexManagementClient.py Github

copy

Full Screen

1#!/bin/python2import os, sys3import json4from elasticsearch import Elasticsearch5from ssl import create_default_context6def getEsClient():7 esService = os.environ['ES_SERVICE']8 connectTimeout = os.getenv('CONNECT_TIMEOUT', 30)9 tokenFile = open('/var/run/secrets/kubernetes.io/serviceaccount/token', 'r')10 bearer_token = tokenFile.read()11 context = create_default_context(cafile='/etc/indexmanagement/keys/admin-ca')12 es_client = Elasticsearch([esService],13 timeout=connectTimeout,14 max_retries=5,15 retry_on_timeout=True,16 headers={"authorization": f"Bearer {bearer_token}"},17 ssl_context=context)18 return es_client19def getAlias(alias):20 try:21 es_client = getEsClient()22 return json.dumps(es_client.indices.get_alias(name=alias))23 except:24 return ""25def getIndicesAgeForAlias(alias):26 try:27 es_client = getEsClient()28 return json.dumps(es_client.indices.get_settings(index=alias, name="index.creation_date"))29 except:30 return ""31def deleteIndices(index):32 original_stdout = sys.stdout33 try:34 es_client = getEsClient()35 response = es_client.indices.delete(index=index)36 return True37 except Exception as e:38 sys.stdout = open('/tmp/response.txt', 'w')39 print(e)40 sys.stdout = original_stdout41 return False42def removeAsWriteIndexForAlias(index, alias):43 es_client = getEsClient()44 response = es_client.indices.update_aliases({45 "actions": [46 {"add":{"index": f"{index}", "alias": f"{alias}", "is_write_index": False}}47 ]48 })49 return response['acknowledged']50def catWriteAliases(policy):51 try:52 es_client = getEsClient()53 alias_name = f"{policy}*-write"54 response = es_client.cat.aliases(name=alias_name, h="alias")55 response_list = list(response.split("\n"))56 return " ".join(sorted(set(response_list)))57 except:58 return ""59def rolloverForPolicy(alias, decoded):60 original_stdout = sys.stdout61 try:62 es_client = getEsClient()63 response = es_client.indices.rollover(alias=alias, body=decoded)64 sys.stdout = open('/tmp/response.txt', 'w')65 print(json.dumps(response))66 sys.stdout = original_stdout67 return True68 except:69 return False70def checkIndexExists(index):71 original_stdout = sys.stdout72 try:73 es_client = getEsClient()74 return es_client.indices.exists(index=index)75 except Exception as e:76 sys.stdout = open('/tmp/response.txt', 'w')77 print(e)78 sys.stdout = original_stdout79 return False80def updateWriteIndex(currentIndex, nextIndex, alias):81 original_stdout = sys.stdout82 try:83 es_client = getEsClient()84 response = es_client.indices.update_aliases({85 "actions": [86 {"add":{"index": f"{currentIndex}", "alias": f"{alias}", "is_write_index": False}},87 {"add":{"index": f"{nextIndex}", "alias": f"{alias}", "is_write_index": True}}88 ]89 })90 return response['acknowledged']91 except Exception as e:92 sys.stdout = open('/tmp/response.txt', 'w')93 print(e)94 sys.stdout = original_stdout...

Full Screen

Full Screen

elastic.py

Source:elastic.py Github

copy

Full Screen

...4from tests.functional.settings import settings5from tests.functional.testdata.data_for_es import data_for_elastic6from tests.functional.testdata.indexes import indexes7@pytest.fixture(scope='session')8async def es_client():9 client = AsyncElasticsearch(hosts=f'{settings.ES_HOST}:{settings.ES_PORT}')10 yield client11 await client.close()12@pytest.fixture(autouse=True)13async def create_es_data(es_client):14 for item in ['movies', 'genres', 'persons']:15 index = indexes.get(item)16 if not await es_client.indices.exists(index=item):17 await es_client.indices.create(index=item, body=index)18 await es_client.bulk(data_for_elastic())19 await asyncio.sleep(1)20 yield...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful