How to use test_integration method in localstack

Best Python code snippet using localstack_python

Test_Integration.py

Source:Test_Integration.py Github

copy

Full Screen

1import os2import requests3import integration_test_utility as util4import pytest5from oeda.databases import setup_experiment_database, db6from sumolib import checkBinary7from oeda.config.R_config import Config8from oeda.log import *9from oeda.rtxlib.dataproviders import createInstance10''' 11 Integration test for OEDA using CrowdNav which should be running in the background:12 main method contains a test suite that executes tests for following tests:13 - checks db connection14 - mlrMBO-API connection15 - SUMO connection16 - data providers and their connections defined in config/crowdnav_config/data_providers.json17 - knobs defined in config/crowdnav_config/knobs.json18 - parsing knobs19 - target system creation20 - experiment creation and execution21 - testing stages and data points in the experiment22 23 The class name should also start with "Test" in order to be discovered by Pytest24 Usage: pytest -s -v Test_Integration.py25 26'''27@pytest.mark.incremental28class Test_Integration():29 # docker without http authentication can also be used30 # by setting host as "192.168.99.100" in oeda.databases.experiment_db_config.json31 elasticsearch_ip = None32 elasticsearch_port = None33 for_tests = True34 knobs = None # for integration tests, they're used as default variables35 changeableVariables = None36 considered_data_types = None # these are subset of all data types, but they account for the weight in overall result37 data_providers = None38 39 target_system = None40 experiment = None41 stage_ids_anova = None42 stage_ids_bogp = None43 stage_ids_ttest = None44 analysis = None45 def test_db_1(self):46 config = util.parse_config(["oeda", "databases"], "experiment_db_config")47 assert config 48 assert config["host"] 49 assert config["port"] 50 assert config["index_definitions"].keys() 51 Test_Integration.elasticsearch_ip = str(config["host"])52 Test_Integration.elasticsearch_port = str(config["port"])53 setup_experiment_database("elasticsearch", Test_Integration.elasticsearch_ip, Test_Integration.elasticsearch_port, for_tests=Test_Integration.for_tests)54 assert db() 55 def test_db_2(self):56 health = db().es.cluster.health()57 # yellow means that the primary shard is allocated but replicas are not58 # and green means that all shards are allocated59 assert health["status"] == 'yellow'60 # uses regular http GET request61 def test_db_3(self):62 res = requests.get("http://" + Test_Integration.elasticsearch_ip + ":" + Test_Integration.elasticsearch_port).json()63 assert res["cluster_name"] 64 assert res["cluster_uuid"] 65 def test_mlrMBO_connection(self):66 assert Config.plumber_host 67 assert Config.plumber_port 68 res = requests.get("http://" + Config.plumber_host + ":" + str(Config.plumber_port)).text69 info(res, Fore.CYAN)70 assert str(res) == '["Plumber API is running"]'71 def test_sumo(self):72 try:73 var = os.environ.get("SUMO_HOME")74 assert var 75 sys.path.append(var)76 sumoGuiBinary = checkBinary('sumo-gui')77 assert sumoGuiBinary 78 sumoBinary = checkBinary('sumo')79 assert sumoBinary 80 except ImportError:81 sys.exit("please declare environment variable 'SUMO_HOME' as the root directory"82 " of your sumo installation (it should contain folders 'bin', 'tools' and 'docs')")83 # as we're only using crowdnav_config for now,84 # make sure right values have been set in dataProviders.json file under oeda/crowdnav_config folder85 # also you need to add 127.0.0.1 kafka entry to /etc/hosts file because we're using kafka:9092 there86 # one important point: we need to set dp["instance"] to Null after assertion87 # because it will be created by oeda.rtxlib.dataproviders.init_data_providers method88 # if we don't do so, ES gives serialization error while saving89 def test_data_provider(self):90 data_providers = util.parse_config(["oeda", "config", "crowdnav_config"], "dataProviders")91 assert data_providers 92 for dp in data_providers:93 assert dp["type"] 94 assert dp["serializer"] 95 createInstance(wf=None, cp=dp)96 assert dp["instance"] 97 dp["instance"] = None98 Test_Integration.data_providers = data_providers99 def test_knobs(self):100 knobs = util.parse_config(["oeda", "config", "crowdnav_config"], "knobs")101 assert knobs 102 # integrity check103 for knob in knobs:104 assert knob["name"] 105 assert knob["description"] 106 assert knob["scale"] 107 assert type(knob["min"]) is float or type(knob["min"]) is int108 assert type(knob["max"]) is float or type(knob["min"]) is int109 assert type(knob["default"]) is float or type(knob["default"]) is int110 Test_Integration.knobs = knobs111 def test_considered_data_types(self):112 data_providers = util.parse_config(["oeda", "config", "crowdnav_config"], "dataProviders")113 assert data_providers 114 # integrity check115 for dp in data_providers:116 assert dp["name"]117 assert dp["description"]118 assert dp["type"]119 assert dp["serializer"]120 assert dp["incomingDataTypes"]121 for dt in dp["incomingDataTypes"]:122 assert dt["name"]123 assert dt["description"]124 assert dt["scale"]125 assert dt["dataProviderName"]126 assert dt["criteria"]127 if dp["name"] == "Trips":128 considered_data_types = util.adjust_functions_and_weights(dp["incomingDataTypes"])129 assert considered_data_types 130 Test_Integration.considered_data_types = considered_data_types131 def test_analysis(self):132 analysis = util.create_analysis_definition(type='3_phase', anovaAlpha=0.05, sample_size=100, tTestEffectSize=0.7)133 assert analysis134 Test_Integration.analysis = analysis135 def test_changeable_variables(self):136 # at least 2 variables (factors) should present if we want to use two-way-anova137 changeableVariables = util.create_changeable_variables(numberOfVariables=2)138 assert changeableVariables139 Test_Integration.changeableVariables = changeableVariables140 # this case must be executed before the rest below141 def test_create_target_system(self):142 target_system = util.create_ts_definition_crowdnav(data_providers=Test_Integration.data_providers,143 knobs=Test_Integration.knobs,144 changeableVariables=Test_Integration.changeableVariables,145 ignore_first_n_samples=30)146 assert target_system147 db().save_target(target_system)148 Test_Integration.target_system = target_system149 def test_create_experiment(self):150 experiment = util.create_experiment_with_mlr_mbo("mlr_mbo",151 sample_size=20,152 knobs=Test_Integration.knobs,153 considered_data_types=Test_Integration.considered_data_types,154 analysis=Test_Integration.analysis,155 optimizer_iterations_in_design=len(Test_Integration.knobs)*4,156 acquisition_method="ei",157 optimizer_iterations=5)158 assert experiment159 assert experiment["id"]160 experiment["targetSystemId"] = Test_Integration.target_system["id"]161 db().save_experiment(experiment)162 saved_experiment = db().get_experiment(experiment["id"])163 assert saved_experiment164 Test_Integration.experiment = experiment165 def test_execution(self):166 workflow = util.rtx_execution(experiment=Test_Integration.experiment, target=Test_Integration.target_system)167 assert workflow168 target_status = db().get_target(Test_Integration.target_system["id"])["status"]169 assert target_status == "READY"170 experiment_status = db().get_experiment(Test_Integration.experiment["id"])["status"]171 assert experiment_status == "SUCCESS"172 self.test_anova()173 self.test_anova_data_points()174 def test_anova(self):175 experiment_id = Test_Integration.experiment["id"]176 assert experiment_id177 stage_ids_anova = db().get_stages(experiment_id=experiment_id, step_no=1)[0] # 0 = _ids, 1 = _source178 assert stage_ids_anova179 Test_Integration.stage_ids_anova = stage_ids_anova180 def test_anova_data_points(self):181 for idx, stage_id in enumerate(Test_Integration.stage_ids_anova):182 assert stage_id183 data_points = db().get_data_points(experiment_id=Test_Integration.experiment["id"], step_no=1, stage_no=idx + 1) # because stages start from 1 whereas idx start from 0184 for point in data_points:185 assert point["payload"]...

Full Screen

Full Screen

test_http.py

Source:test_http.py Github

copy

Full Screen

1from subprocess import Popen2import time3import os4from random import randint5from pathlib import Path6import unittest7import requests8import psutil9from mindsdb.utilities.config import Config10import importlib.util11common_path = Path(__file__).parent.parent.absolute().joinpath('flows/common.py').resolve()12spec = importlib.util.spec_from_file_location("common", str(common_path))13common = importlib.util.module_from_spec(spec)14spec.loader.exec_module(common)15rand = randint(0, pow(10, 12))16ds_name = f'hr_ds_{rand}'17pred_name = f'hr_predictor_{rand}'18root = 'http://localhost:47334'19class HTTPTest(unittest.TestCase):20 @classmethod21 def setUpClass(cls):22 config = Config(common.TEST_CONFIG)23 cls.initial_integrations_names = list(config['integrations'].keys())24 config_path = common.prepare_config(config, ['default_mariadb', 'default_clickhouse'])25 cls.sp = Popen(26 ['python3', '-m', 'mindsdb', '--api', 'http', '--config', config_path],27 close_fds=True,28 stdout=None,29 stderr=None30 )31 for i in range(20):32 try:33 res = requests.get(f'{root}/util/ping')34 if res.status_code != 200:35 raise Exception('')36 else:37 break38 except Exception:39 time.sleep(1)40 if i == 19:41 raise Exception("Can't connect!")42 @classmethod43 def tearDownClass(cls):44 try:45 conns = psutil.net_connections()46 pid = [x.pid for x in conns if x.status == 'LISTEN' and x.laddr[1] == 47334 and x.pid is not None]47 if len(pid) > 0:48 os.kill(pid[0], 9)49 cls.sp.kill()50 except Exception:51 pass52 def test_1_config(self):53 res = requests.get(f'{root}/config/integrations')54 assert res.status_code == 20055 integration_names = res.json()56 for integration_name in integration_names['integrations']:57 assert integration_name in self.initial_integrations_names58 test_integration_data = {'enabled': False, 'host': 'test', 'type': 'clickhouse'}59 res = requests.put(f'{root}/config/integrations/test_integration', json={'params': test_integration_data})60 assert res.status_code == 20061 res = requests.get(f'{root}/config/integrations/test_integration')62 assert res.status_code == 20063 test_integration = res.json()64 print(test_integration, len(test_integration))65 assert len(test_integration) == 666 res = requests.delete(f'{root}/config/integrations/test_integration')67 assert res.status_code == 20068 res = requests.get(f'{root}/config/integrations/test_integration')69 assert res.status_code != 20070 for k in test_integration_data:71 assert test_integration[k] == test_integration_data[k]72 for name in ['default_mariadb', 'default_clickhouse']:73 # Get the original74 res = requests.get(f'{root}/config/integrations/{name}')75 assert res.status_code == 20076 integration = res.json()77 for k in ['enabled', 'host', 'port', 'type', 'user']:78 assert k in integration79 assert integration[k] is not None80 assert integration['password'] is None81 # Modify it82 res = requests.post(83 f'{root}/config/integrations/{name}',84 json={'params': {'user': 'dr.Who'}}85 )86 res = requests.get(f'{root}/config/integrations/{name}')87 assert res.status_code == 20088 modified_integration = res.json()89 assert modified_integration['password'] is None90 assert modified_integration['user'] == 'dr.Who'91 for k in integration:92 if k not in ['password', 'date_last_update', 'user']:93 assert modified_integration[k] == integration[k]94 # Put the original values back in\95 del integration['password']96 res = requests.post(f'{root}/config/integrations/{name}', json={'params': integration})97 res = requests.get(f'{root}/config/integrations/{name}')98 assert res.status_code == 20099 modified_integration = res.json()100 for k in integration:101 if k != 'date_last_update':102 assert modified_integration[k] == integration[k]103 def test_2_put_ds(self):104 # PUT datasource105 params = {106 'name': ds_name,107 'source_type': 'url',108 'source': 'https://raw.githubusercontent.com/mindsdb/mindsdb-examples/master/classics/home_rentals/dataset/train.csv'109 }110 url = f'{root}/datasources/{ds_name}'111 res = requests.put(url, json=params)112 assert res.status_code == 200113 db_ds_name = ds_name + '_db'114 params = {115 'name': db_ds_name,116 'query': 'SELECT arrayJoin([1,2,3]) as a, arrayJoin([1,2,3,4,5,6,7,8]) as b',117 'integration_id': 'default_clickhouse'118 }119 url = f'{root}/datasources/{db_ds_name}'120 res = requests.put(url, json=params)121 assert res.status_code == 200122 ds_data = res.json()123 assert ds_data['source_type'] == 'default_clickhouse'124 assert ds_data['row_count'] == 3 * 8125 def test_3_analyze(self):126 response = requests.get(f'{root}/datasources/{ds_name}/analyze')127 assert response.status_code == 200128 def test_3_put_predictor(self):129 # PUT predictor130 params = {131 'data_source_name': ds_name,132 'to_predict': 'rental_price',133 'kwargs': {134 'stop_training_in_x_seconds': 5,135 'join_learn_process': True136 }137 }138 url = f'{root}/predictors/{pred_name}'139 res = requests.put(url, json=params)140 assert res.status_code == 200141 # POST predictions142 params = {143 'when': {'sqft': 500}144 }145 url = f'{root}/predictors/{pred_name}/predict'146 res = requests.post(url, json=params)147 assert isinstance(res.json()[0]['rental_price']['predicted_value'], float)148 assert res.status_code == 200149 def test_4_datasources(self):150 """151 Call list datasources endpoint152 THEN check the response is success153 """154 response = requests.get(f'{root}/datasources/')155 assert response.status_code == 200156 def test_5_datasource_not_found(self):157 """158 Call unexisting datasource159 then check the response is NOT FOUND160 """161 response = requests.get(f'{root}/datasource/dummy_source')162 assert response.status_code == 404163 def test_6_ping(self):164 """165 Call utilities ping endpoint166 THEN check the response is success167 """168 response = requests.get(f'{root}/util/ping')169 assert response.status_code == 200170 def test_7_predictors(self):171 """172 Call list predictors endpoint173 THEN check the response is success174 """175 response = requests.get(f'{root}/predictors/')176 assert response.status_code == 200177 def test_8_predictor_not_found(self):178 """179 Call unexisting predictor180 then check the response is NOT FOUND181 """182 response = requests.get(f'{root}/predictors/dummy_predictor')183 assert response.status_code == 404184if __name__ == '__main__':...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1"""2@author: Julien Zoubian3@organization: CPPM / LAM4@license: Gnu Public Licence5@contact: zoubian@cppm.in2p3.fr6Initialize the TIPS test.7"""8import test_unit9import test_preintegration 10import test_integration11import test_integration_pp 12import test_fullsystem13import test_unitAxesim14import test_Verify15def run():16 test_unit.unittest.TextTestRunner(verbosity=2).run(test_unit.tips_utest()) # runs OK17 test_preintegration.unittest.TextTestRunner(verbosity=2).run(test_preintegration.tips_pretest()) # runs OK18 test_integration.unittest.TextTestRunner(verbosity=2).run(test_integration.tips_inttest()) # runs OK19 #test_integration_pp.unittest.TextTestRunner(verbosity=2).run(test_integration.tips_inttest_pp())20 #test_fullsystem.unittest.TextTestRunner(verbosity=2).run(test_integration.tips_fulltest())21 test_unitAxesim.unittest.TextTestRunner(verbosity=2).run(test_unitAxesim.tips_atest()) # runs OK...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful