How to use read_configuration method in Behave

Best Python code snippet using behave

jarvis_configuration_manager.py

Source:jarvis_configuration_manager.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import os3import sys4import requests5import base646import re7import shutil8import json9from pathlib import Path10from subprocess import check_output11import platform12import ctypes13from jarvis_sdk import jarvis_config14from jarvis_sdk import jarvis_auth15from jarvis_sdk import jarvis_misc16from jarvis_sdk import sql_dag_generator17def display_configuration_help(command, jarvis_configuration, firebase_user):18 try:19 # Get UID20 #21 uid = firebase_user["userId"]22 url = jarvis_configuration["jarvis_api_endpoint"] + \23 "configuration/v2/help"24 data = {25 "payload": {26 "resource_type": "help",27 "resource": command + "_help"28 }29 }30 headers = {31 "Content-type": "application/json",32 "Authorization": "Bearer " + firebase_user["idToken"]33 }34 r = requests.post(url, headers=headers, data=json.dumps(35 data), verify=jarvis_configuration["perform_ssl_verification"])36 if r.status_code != 200:37 print("\nError : %s\n" % str(r.content, "utf-8"))38 else:39 response = r.json()40 print(response["payload"]["help"])41 except Exception as ex:42 print("Error while trying to contact Jarvis API ...")43 print(ex)44 return False45 return True46def process_sql_query(read_configuration, input_conf_file):47 # Get path48 #49 filepath = os.path.dirname(input_conf_file)50 print("File path : {}".format(filepath))51 # Read associated SQL file52 #53 host_system = jarvis_misc.check_platform()54 path_element = None55 if (host_system == "Linux") or (host_system == "Darwin"):56 path_element = "/"57 elif host_system == "Windows":58 path_element = "\\"59 else:60 print("Host OS unknown, cannot process SQL file reading.")61 return None62 if (filepath is None) or (filepath == ""):63 sql_full_filename = read_configuration["sql_file"]64 else:65 sql_full_filename = filepath + path_element + \66 read_configuration["sql_file"]67 print("SQL file path : {}".format(sql_full_filename))68 try:69 # Read the content of the file, UTF-8 converted70 #71 read_sql_file = jarvis_misc.read_file_as_utf_8(sql_full_filename)72 # Convert SQL query into Base6473 #74 return str(base64.b64encode(read_sql_file), "utf-8")75 76 except Exception as ex:77 print("Error while reading SQL file : " + ex)78 79 return None80def process_configuration_file(input_conf_file, read_configuration=None):81 # Legacy mode82 #83 if read_configuration is None:84 # Check if the file exists85 #86 if os.path.isfile(input_conf_file) is False:87 print("File \"%s\" does not exists." % input_conf_file)88 return None89 # Read file and parse it as JSON90 #91 read_configuration = None92 try:93 # Read the content of the file, UTF-8 converted94 #95 file_read = jarvis_misc.read_file_as_utf_8(input_conf_file)96 read_configuration = json.loads(file_read)97 except Exception as ex:98 print("Error while parsing JSON configuration file : {}".format(input_conf_file))99 print(ex)100 return None101 # Add direct_execution if not present102 #103 try:104 direct_execution = read_configuration["direct_execution"]105 if type(direct_execution) is not bool:106 read_configuration["direct_execution"] = True107 except Exception:108 read_configuration["direct_execution"] = True109 # Get global path of the configuration file110 #111 configuration_absolute_pathname = jarvis_misc.get_path_from_file(112 input_conf_file)113 # Special processing for "table-to-storage"114 #115 if read_configuration["configuration_type"] == "table-to-storage":116 sql_query = process_sql_query(read_configuration, input_conf_file)117 if sql_query is None:118 return None119 read_configuration["sql"] = sql_query120 # Special processing for "storage-to-tables"121 #122 elif read_configuration["configuration_type"] == "storage-to-tables":123 # Process global Markdown file124 #125 try:126 doc_md = configuration_absolute_pathname + \127 read_configuration["doc_md"]128 print("Global Markdown file provided : {}".format(doc_md))129 try:130 with open(doc_md, "r") as f:131 read_md_file = f.read()132 read_md_file = bytes(read_md_file, "utf-8")133 read_configuration["doc_md"] = str(134 base64.b64encode(read_md_file), "utf-8")135 except Exception as ex:136 print("Error while reading Markdown file : " + ex)137 return None138 except KeyError:139 print("No global Markdown file provided. Continuing ...")140 # Process Destination141 #142 try:143 for destination in read_configuration["destinations"]:144 for table in destination["tables"]:145 # Manage DDL Mode146 #147 # 3 modes are supported :148 # file | header | file_template149 #150 # If mode is "file" the attribute "ddl_file" must be present151 # If mode is "file_template", the attribute "ddl_file_template" must be present152 #153 if ("ddl_mode" not in table.keys()) or (table["ddl_mode"] == "file"):154 try:155 ddl_file = configuration_absolute_pathname + table["ddl_file"]156 print("Processing DDL file : {}".format(ddl_file))157 # Read the content of the file, UTF-8 converted158 #159 payload = jarvis_misc.read_file_as_utf_8(ddl_file)160 # Try to parse the file as JSON to make sure there is no syntax error161 #162 json.loads(payload)163 # Finally, Base 64 encode to get it ready for transfer164 #165 read_ddl_file = payload166 table["ddl_infos"] = str(base64.b64encode(read_ddl_file), "utf-8")167 except Exception as ex:168 print("Error while parsing DDL file : {}".format(ddl_file))169 print(ex)170 return None171 # Process Markdown file172 # optional173 #174 try:175 doc_md = configuration_absolute_pathname + table["doc_md"]176 print("Processing table Markdown file : {}".format(doc_md))177 # Read the content of the file, UTF-8 converted178 #179 read_doc_md = jarvis_misc.read_file_as_utf_8(doc_md)180 table["doc_md"] = str(base64.b64encode(read_doc_md), "utf-8")181 except Exception as ex:182 print("Cannot process table Markdown file. Continuing ... : {}".format(ex))183 except Exception as ex:184 print("Error while processing destinations / tables : {}".format(ex))185 return None186 return read_configuration187def check_configuration(188 input_conf_file=None,189 jarvis_configuration=None,190 read_configuration=None,191 firebase_user=None):192 # Process configuration file193 #194 read_configuration = process_configuration_file(input_conf_file, read_configuration=read_configuration)195 # Call API196 #197 try:198 url = jarvis_configuration["jarvis_api_endpoint"] + "configuration/v2"199 data = {200 "payload": {201 "resource_type": "check-configuration",202 "resource": read_configuration,203 "uid": firebase_user["userId"]204 }205 }206 headers = {207 "Content-type": "application/json",208 "Authorization": "Bearer " + firebase_user["idToken"]}209 r = requests.post(url, headers=headers, data=json.dumps(210 data), cert=jarvis_configuration["client_ssl_certificate"], verify=jarvis_configuration["perform_ssl_verification"])211 if r.status_code == 404:212 # Special case : if the configuration JSON Schema is not found, we let pass until we can complete the JSON Schema database213 #214 print("\nConfiguration JSON Schema not found in JARVIS Platform.")215 return True216 elif r.status_code != 200:217 print("\nError(s) : \n%s\n" % str(r.content, "utf-8"))218 return False219 else:220 response = r.json()221 print(response["payload"]["message"])222 return True223 except Exception as ex:224 print("Error while trying to contact Jarvis API ...")225 print(ex)226 return False227def get_project_profile_from_configuration(228 input_conf_file=None,229 jarvis_configuration=None,230 read_configuration=None,231 firebase_user=None):232 # Process configuration file233 #234 if read_configuration is None:235 read_configuration = process_configuration_file(input_conf_file, read_configuration=read_configuration)236 # Call API237 #238 try:239 url = jarvis_configuration["jarvis_api_endpoint"] + "configuration/v2"240 data = {241 "payload": {242 "resource_type": "get-gcp-project-id",243 "resource": read_configuration,244 "uid": firebase_user["userId"]245 }246 }247 headers = {248 "Content-type": "application/json",249 "Authorization": "Bearer " + firebase_user["idToken"]}250 r = requests.post(url, headers=headers, data=json.dumps(251 data), verify=jarvis_configuration["perform_ssl_verification"])252 if r.status_code == 404:253 # Not found254 #255 print("\nGCP Project ID not found for your configuration")256 return None257 elif r.status_code != 200:258 print("\nError(s) : \n%s\n" % str(r.content, "utf-8"))259 return None260 else:261 response = r.json()262 return response["payload"]["message"]263 except Exception as ex:264 print("Error while trying to contact Jarvis API ...")265 print(ex)266 return None267def deploy_configuration(268 input_conf_file,269 jarvis_configuration,270 firebase_user,271 gcp_cf_deploy,272 project_profile=None,273 read_configuration=None,274 jarvis_sdk_version=None,275 no_dag_execution=False):276 # Process configuration file277 #278 if read_configuration is None:279 read_configuration = process_configuration_file(input_conf_file)280 # If project281 # Tag configuration with :282 # client_type283 # client_version284 #285 read_configuration["client_type"] = "jarvis-sdk"286 read_configuration["client_version"] = jarvis_sdk_version287 # Ask if the user wants to launch an execution upon successfull upload288 # Will work onmy for direct execution DAGs289 #290 execute_dag = False291 if no_dag_execution is False:292 try:293 if read_configuration["direct_execution"] is True:294 if read_configuration["configuration_type"] in ["vm-launcher"]:295 while True:296 print("Do you want to execute the DAG associated with your configuration ? y/n. Press enter for \"n\" : ", end='', flush=True)297 user_value = input()298 if len(user_value) == 0:299 user_value = "n"300 if user_value == "y":301 execute_dag = True302 break303 elif user_value == "n":304 execute_dag = False305 break306 except Exception:307 execute_dag = False308 # Do we need to deploy the associated CF ?309 #310 if gcp_cf_deploy is True:311 print("\nThe GCP Cloud Function associated with your configuration deployment will be deployed if needed.")312 print("Please wait up to 2 minutes for full deployment.")313 else:314 print("\nThe GCP Cloud Function associated to your configuration WILL NOT be deployed.")315 # Call API316 #317 try:318 url = jarvis_configuration["jarvis_api_endpoint"] + "configuration/v2"319 data = {320 "payload": {321 "resource": read_configuration,322 "project_profile": project_profile,323 "uid": firebase_user["userId"],324 "deploy_cf": gcp_cf_deploy,325 "execute_dag": execute_dag326 }327 }328 headers = {329 "Content-type": "application/json",330 "Authorization": "Bearer " + firebase_user["idToken"331 ]}332 r = requests.put(url, headers=headers, data=json.dumps(333 data), verify=jarvis_configuration["perform_ssl_verification"])334 if r.status_code != 200:335 print("\nError : %s\n" % str(r.content, "utf-8"))336 return False337 else:338 response = r.json()339 print(response["payload"]["message"])340 return True341 except Exception as ex:342 print("Error while trying to contact Jarvis API ...")343 print(ex)344 return False345def create_configuration(configuration_type, output_file, jarvis_configuration, firebase_user):346 # Some information347 #348 print("Configuration type : {}".format(configuration_type))349 print("Output file : {}".format(output_file))350 # Call API351 #352 try:353 url = jarvis_configuration["jarvis_api_endpoint"] + "configuration/v2"354 data = {355 "payload": {356 "resource_type": "configuration-type",357 "resource": configuration_type,358 "uid": firebase_user["userId"]359 }360 }361 headers = {362 "Content-type": "application/json",363 "Authorization": "Bearer " + firebase_user["idToken"364 ]}365 r = requests.post(url, headers=headers, data=json.dumps(366 data), verify=jarvis_configuration["perform_ssl_verification"])367 if r.status_code != 200:368 print("\nError : %s\n" % str(r.content, "utf-8"))369 return False370 else:371 response = r.json()372 config = str(base64.b64decode(373 response["payload"]["content"]), "utf-8")374 with open(output_file, mode='w') as file:375 file.write(config)376 return True377 except Exception as ex:378 print("Error while trying to contact Jarvis API ...")379 print(ex)380 return False381def check_table_to_table(input_configuration):382 # Process configuration file383 #384 read_configuration = process_configuration_file(input_configuration)385 if read_configuration is None:386 return False387 try:388 if read_configuration["configuration_type"] == "table-to-table":389 return True390 except Exception as ex:391 print("Error while parsing configuration file.")392 print(ex)393 return False394def check_sts_or_stt(input_configuration):395 # Process configuration file396 #397 read_configuration = process_configuration_file(input_configuration)398 if read_configuration is None:399 return False400 try:401 if (read_configuration["configuration_type"] == "storage-to-storage") or (read_configuration["configuration_type"] == "storage-to-tables"):402 return True403 except Exception as ex:404 print("Error while parsing configuration file.")405 print(ex)406 return False407def process_cf_deployment(input_configuration, jarvis_configuration, no_gcp_cf_deploy):408 409 if no_gcp_cf_deploy is True:410 return False411 else:412 if check_sts_or_stt(input_configuration) is True:413 try:414 if jarvis_configuration["cf_deploy_auto_sts_stt"] is True:415 return True416 except:417 noop = 1418 # If we are here, we need to ask the user419 #420 while True:421 print("Do you want to perform the deployment of the associated Cloud Function ? y/n. Press enter for \"y\" : ", end='', flush=True)422 user_value = input()423 if len(user_value) == 0:424 user_value = "y"425 if user_value == "y":426 return True427 elif user_value == "n":428 return False429 else:430 return True431def get_configuration_version(input_configuration):432 # Process configuration file433 #434 read_configuration = process_configuration_file(input_configuration)435 if read_configuration is None:436 return "1"437 try:438 return read_configuration["version"]439 except Exception:440 return "1"441def process_project_profile(442 input_conf_file,443 jarvis_configuration,444 firebase_user):445 project_profile = get_project_profile_from_configuration(446 input_conf_file=input_conf_file,447 jarvis_configuration=jarvis_configuration,448 firebase_user=firebase_user)449 if (project_profile is None) or (project_profile == ""):450 # Get list of project profiles open to the user and ask him to pick one451 #452 ret_code, project_profile = jarvis_misc.choose_project_profiles(453 jarvis_configuration, firebase_user)454 if ret_code is False:455 return None456 else:457 print("\nProject profile used : {}\n".format(458 project_profile))459 return project_profile460def process_configuration_context(461 input_configuration,462 jarvis_configuration,463 firebase_user,464 context_choice=None):465 # Read RAW configuration466 #467 read_configuration = process_configuration_file(input_configuration)468 if read_configuration is None:469 return False, None470 # Check oif a context has been specified on command lien.471 #472 if context_choice is None:473 # Ask the user to choose the project context474 # Only available context will be presented475 #476 ret_code, context_choice = jarvis_misc.choose_context(477 jarvis_configuration=jarvis_configuration,478 firebase_user=firebase_user)479 if ret_code is False:480 return False, None481 # No context management482 #483 if context_choice.strip() == "NO_CONTEXT":484 return True, None485 # Send back the choice to Tailer API and return the project context486 #487 ret_code, context_data = jarvis_misc.retrieve_context(488 jarvis_configuration=jarvis_configuration,489 firebase_user=firebase_user,490 context_id=context_choice.strip())491 if ret_code is False:492 return False, None493 # Transform configuration to string494 #495 configuration_string = json.dumps(read_configuration, indent=4)496 # Parse the user's configuration file and apply context497 #498 try:499 configuration_string = configuration_string.replace("{{FD_ENV}}", context_data["environment"])500 configuration_string = configuration_string.replace("{{FD_ACCOUNT}}", context_data["account"])501 # Regular variables502 #503 regex = """\"{{([^{]*)}}\""""504 result = re.findall(regex, configuration_string)505 506 for variable in result:507 try:508 print("Parsing variable : {}".format(variable))509 configuration_string = configuration_string.replace("""\"{{""" + variable + """}}\"""", json.dumps(context_data["parameters"][variable]["value"], indent=4))510 except Exception:511 pass512 # Variables embedded within strings513 # This will replace STRING, INT and FLOAT types only514 #515 regex = """{{([^{]*)}}"""516 result = re.findall(regex, configuration_string)517 518 for variable in result:519 try:520 print("Parsing variable : {}".format(variable))521 if context_data["parameters"][variable]["type"] in ["string", "integer", "float"]:522 configuration_string = configuration_string.replace("""{{""" + variable + """}}""", json.dumps(context_data["parameters"][variable]["value"], indent=4).replace("\"",""))523 else:524 configuration_string = configuration_string.replace("""{{""" + variable + """}}""", "VARIABLE_TYPE_NOT_SUPPORTED_PLEASE_CHECK_CONTEXT")525 except Exception as ex:526 print("Error while applying context on the configuration : {}".format(str(ex)))527 pass528 read_configuration = json.loads(configuration_string)529 # Process configuration ID and configuration name530 #531 read_configuration["configuration_name"] = read_configuration["configuration_id"]532 read_configuration["configuration_id"] = read_configuration["account"].strip() + "_" + context_data["configuration_id"].strip() + "_" + read_configuration["configuration_id"].strip()533 except Exception as ex:534 print("Error while parsing configuration : {}".format(str(ex)))535 return False, None536 return True, read_configuration537def process(args, jarvis_sdk_version):538 print("Jarvis Configuration Manager.")539 # Get configuration540 #541 jarvis_configuration = jarvis_config.get_jarvis_configuration_file()542 # Get firebase user543 #544 firebase_user = jarvis_auth.get_refreshed_firebase_user(545 jarvis_configuration)546 if args.command == "deploy":547 if len(args.arguments) >= 2:548 if args.arguments[1] is not None:549 if args.arguments[1] == "help":550 return display_configuration_help(args.command, jarvis_configuration, firebase_user)551 else:552 # # Special check for TABLE-TO-TABLE (DAG Generator) configuration553 # # If so, we need to process the configuration file554 # #555 # if check_table_to_table(args.arguments[1]) is True:556 # print("Processing table-to-table type configuration ...")557 # sql_dag_generator.process(558 # configuration_file=args.arguments[1], jarvis_sdk_version=jarvis_sdk_version)559 # return True560 # Get optional flags561 #562 cmd_line_context = None563 cmd_line_no_launch = False564 for remainder_index in range(2,len(args.arguments)):565 if args.arguments[remainder_index].strip() == "--context":566 try:567 cmd_line_context = args.arguments[remainder_index + 1].strip()568 except Exception:569 print("\nError : Please specify a context.\n")570 return False571 elif args.arguments[remainder_index].strip() == "--no-launch":572 cmd_line_no_launch = True573 # Retrieve configuration version574 #575 configuration_version = get_configuration_version(input_configuration=args.arguments[1])576 print("Configuration version : {}".format(configuration_version))577 # Process context for configuration version >= 2578 #579 read_configuration = None580 if configuration_version == "2":581 ret_code, read_configuration = process_configuration_context(582 input_configuration=args.arguments[1],583 jarvis_configuration=jarvis_configuration,584 firebase_user=firebase_user,585 context_choice=cmd_line_context)586 if ret_code is False:587 print("Error while processing configuration context.")588 return False589 # DEBUG590 #591 try:592 print("Configuration id : {}".format(read_configuration["configuration_id"]))593 except Exception:594 pass595 # Special check for TABLE-TO-TABLE (DAG Generator) configuration596 # If so, we need to process the configuration file597 #598 if check_table_to_table(args.arguments[1]) is True:599 print("Processing table-to-table type configuration ...")600 sql_dag_generator.process(601 configuration_file=args.arguments[1],602 read_configuration=read_configuration,603 jarvis_sdk_version=jarvis_sdk_version)604 return True605 # Check if the configuration is valid606 #607 if check_configuration(608 input_conf_file=args.arguments[1],609 jarvis_configuration=jarvis_configuration,610 read_configuration=read_configuration,611 firebase_user=firebase_user) is False:612 return False613 # Process Cloud Function deployment614 #615 # Only for STS and STT616 #617 gcp_cf_deploy = process_cf_deployment(input_configuration=args.arguments[1], jarvis_configuration=jarvis_configuration, no_gcp_cf_deploy=args.no_gcp_cf_deploy)618 # Check if GCP Project ID is present619 #620 project_profile = None621 if (configuration_version == "1") or (read_configuration is None):622 project_profile = process_project_profile(623 input_conf_file=args.arguments[1],624 jarvis_configuration=jarvis_configuration,625 firebase_user=firebase_user)626 if project_profile is None:627 return False628 else:629 project_profile = get_project_profile_from_configuration(630 input_conf_file=args.arguments[1],631 jarvis_configuration=jarvis_configuration,632 read_configuration=read_configuration,633 firebase_user=firebase_user)634 if project_profile is None:635 return False636 print("Attempting to deploy regular configuration...")637 # DEBUG638 #639 try:640 print("Configuration id : {}".format(read_configuration["configuration_id"]))641 except Exception:642 pass643 return deploy_configuration(644 args.arguments[1],645 jarvis_configuration,646 firebase_user,647 gcp_cf_deploy,648 project_profile=project_profile,649 read_configuration=read_configuration,650 jarvis_sdk_version=jarvis_sdk_version,651 no_dag_execution=cmd_line_no_launch)652 else:653 print("Argument unknown." % args.arguments[1])654 return False655 else:656 return display_configuration_help(args.command, jarvis_configuration, firebase_user)657 elif args.command == "create":658 if len(args.arguments) >= 2:659 if args.arguments[1] is not None:660 if args.arguments[1] == "help":661 return display_configuration_help(args.command, jarvis_configuration, firebase_user)662 else:663 # retrieve output_filename664 #665 try:666 output_filename = args.arguments[2]667 except Exception:668 output_filename = args.arguments[1] + ".json"669 return create_configuration(args.arguments[1], output_filename, jarvis_configuration, firebase_user)670 else:671 print("Argument unknown." % args.arguments[1])672 return False673 else:674 return display_configuration_help(args.command, jarvis_configuration, firebase_user)675 elif args.command == "check":676 if len(args.arguments) >= 2:677 if args.arguments[1] is not None:678 if args.arguments[1] == "help":679 return display_configuration_help(args.command, jarvis_configuration, firebase_user)680 else:681 return check_configuration(input_conf_file=args.arguments[1], jarvis_configuration=jarvis_configuration, firebase_user=firebase_user)682 else:683 print("Argument unknown." % args.arguments[1])684 return False685 else:686 return display_configuration_help(args.command, jarvis_configuration, firebase_user)...

Full Screen

Full Screen

test_loaders.py

Source:test_loaders.py Github

copy

Full Screen

...10from celery.loaders import base, default11from celery.loaders.app import AppLoader12from celery.utils.imports import NotAPackage13class DummyLoader(base.BaseLoader):14 def read_configuration(self):15 return {'foo': 'bar', 'imports': ('os', 'sys')}16class test_loaders:17 def test_get_loader_cls(self):18 assert loaders.get_loader_cls('default') is default.Loader19class test_LoaderBase:20 message_options = {'subject': 'Subject',21 'body': 'Body',22 'sender': 'x@x.com',23 'to': 'y@x.com'}24 server_options = {'host': 'smtp.x.com',25 'port': 1234,26 'user': 'x',27 'password': 'qwerty',28 'timeout': 3}29 def setup(self):30 self.loader = DummyLoader(app=self.app)31 def test_handlers_pass(self):32 self.loader.on_task_init('foo.task', 'feedface-cafebabe')33 self.loader.on_worker_init()34 def test_now(self):35 assert self.loader.now(utc=True)36 assert self.loader.now(utc=False)37 def test_read_configuration_no_env(self):38 assert base.BaseLoader(app=self.app).read_configuration(39 'FOO_X_S_WE_WQ_Q_WE') is None40 def test_autodiscovery(self):41 with patch('celery.loaders.base.autodiscover_tasks') as auto:42 auto.return_value = [Mock()]43 auto.return_value[0].__name__ = 'moo'44 self.loader.autodiscover_tasks(['A', 'B'])45 assert 'moo' in self.loader.task_modules46 self.loader.task_modules.discard('moo')47 def test_import_task_module(self):48 assert sys == self.loader.import_task_module('sys')49 def test_init_worker_process(self):50 self.loader.on_worker_process_init()51 m = self.loader.on_worker_process_init = Mock()52 self.loader.init_worker_process()53 m.assert_called_with()54 def test_config_from_object_module(self):55 self.loader.import_from_cwd = Mock()56 self.loader.config_from_object('module_name')57 self.loader.import_from_cwd.assert_called_with('module_name')58 def test_conf_property(self):59 assert self.loader.conf['foo'] == 'bar'60 assert self.loader._conf['foo'] == 'bar'61 assert self.loader.conf['foo'] == 'bar'62 def test_import_default_modules(self):63 def modnames(l):64 return [m.__name__ for m in l]65 self.app.conf.imports = ('os', 'sys')66 assert (sorted(modnames(self.loader.import_default_modules())) ==67 sorted(modnames([os, sys])))68 def test_import_default_modules_with_exception(self):69 """ Make sure exceptions are not silenced since this step is prior to70 setup logging. """71 def trigger_exception(**kwargs):72 raise ImportError('Dummy ImportError')73 from celery.signals import import_modules74 x = import_modules.connect(trigger_exception)75 self.app.conf.imports = ('os', 'sys')76 with pytest.raises(ImportError):77 self.loader.import_default_modules()78 import_modules.disconnect(x)79 def test_import_from_cwd_custom_imp(self):80 imp = Mock(name='imp')81 self.loader.import_from_cwd('foo', imp=imp)82 imp.assert_called()83 def test_cmdline_config_ValueError(self):84 with pytest.raises(ValueError):85 self.loader.cmdline_config_parser(['broker.port=foobar'])86class test_DefaultLoader:87 @patch('celery.loaders.base.find_module')88 def test_read_configuration_not_a_package(self, find_module):89 find_module.side_effect = NotAPackage()90 l = default.Loader(app=self.app)91 with pytest.raises(NotAPackage):92 l.read_configuration(fail_silently=False)93 @patch('celery.loaders.base.find_module')94 @mock.environ('CELERY_CONFIG_MODULE', 'celeryconfig.py')95 def test_read_configuration_py_in_name(self, find_module):96 find_module.side_effect = NotAPackage()97 l = default.Loader(app=self.app)98 with pytest.raises(NotAPackage):99 l.read_configuration(fail_silently=False)100 @patch('celery.loaders.base.find_module')101 def test_read_configuration_importerror(self, find_module):102 default.C_WNOCONF = True103 find_module.side_effect = ImportError()104 l = default.Loader(app=self.app)105 with pytest.warns(NotConfigured):106 l.read_configuration(fail_silently=True)107 default.C_WNOCONF = False108 l.read_configuration(fail_silently=True)109 def test_read_configuration(self):110 from types import ModuleType111 class ConfigModule(ModuleType):112 pass113 configname = os.environ.get('CELERY_CONFIG_MODULE') or 'celeryconfig'114 celeryconfig = ConfigModule(bytes_if_py2(configname))115 celeryconfig.imports = ('os', 'sys')116 prevconfig = sys.modules.get(configname)117 sys.modules[configname] = celeryconfig118 try:119 l = default.Loader(app=self.app)120 l.find_module = Mock(name='find_module')121 settings = l.read_configuration(fail_silently=False)122 assert settings.imports == ('os', 'sys')123 settings = l.read_configuration(fail_silently=False)124 assert settings.imports == ('os', 'sys')125 l.on_worker_init()126 finally:127 if prevconfig:128 sys.modules[configname] = prevconfig129 def test_read_configuration_ImportError(self):130 sentinel = object()131 prev, os.environ['CELERY_CONFIG_MODULE'] = (132 os.environ.get('CELERY_CONFIG_MODULE', sentinel), 'daweqew.dweqw',133 )134 try:135 l = default.Loader(app=self.app)136 with pytest.raises(ImportError):137 l.read_configuration(fail_silently=False)138 l.read_configuration(fail_silently=True)139 finally:140 if prev is not sentinel:141 os.environ['CELERY_CONFIG_MODULE'] = prev142 else:143 os.environ.pop('CELERY_CONFIG_MODULE', None)144 def test_import_from_cwd(self):145 l = default.Loader(app=self.app)146 old_path = list(sys.path)147 try:148 sys.path.remove(os.getcwd())149 except ValueError:150 pass151 celery = sys.modules.pop('celery', None)152 sys.modules.pop('celery.local', None)...

Full Screen

Full Screen

privgit.py

Source:privgit.py Github

copy

Full Screen

...57 This 'flat' syntax exists for the purposes of fetching data from very limited sources (like the Foreman's host params58 in pre 1.22 version)59 '''60 def fail(ex): raise ex61 def read_configuration(key, d):62 return d[key] if key in d else fail(SaltConfigurationError("option: {} not found in configuration".format(key)))63 def deflatten_pillar():64 privgit_pattern = re.compile("privgit_\S+_\S+")65 d = []66 for e in (e for e in pillar if privgit_pattern.match(e) is not None):67 value = pillar[e]68 keys = e[8:].split('_', 1)69 d.append({keys[0]: {70 keys[1]: value71 }})72 return d73 def previous_pillar():74 r = pillar75 key = kwargs['lookup_key'] if 'lookup_key' in kwargs else ext_name76 for each in key.split(DEFAULT_TARGET_DELIM):77 if each in r:78 r = r[each]79 else:80 log.error("key: %s (part: %s) not found in pillar", key, each)81 return []82 if isinstance(r, list):83 return r84 else:85 log.error("key: %s must be a list", key)86 return []87 def write_file(path, content, perms):88 __salt__['file.write'](path, content)89 __salt__['file.set_mode'](path, perms)90 ext_name = __virtualname__91 opt_url = 'url'92 opt_branch = 'branch'93 opt_env = 'env'94 opt_root = 'root'95 opt_privkey = 'privkey'96 opt_pubkey = 'pubkey'97 opt_privkey_loc = 'privkey_location'98 opt_pubkey_loc = 'pubkey_location'99 privgit_args = kwargs['repositories'] if 'repositories' in kwargs else []100 privgit_pillar = previous_pillar()101 privgit_deflatten_pillar = deflatten_pillar()102 repositories = collections.OrderedDict()103 repositories = __utils__['common.merge'](privgit_args, repositories)104 repositories = __utils__['common.merge'](privgit_pillar, repositories)105 repositories = __utils__['common.merge'](privgit_deflatten_pillar, repositories)106 log.info("Using following repositories: %s", repositories)107 cachedir = __salt__['config.get']('cachedir')108 ret = {}109 for repository_name, repository_opts in repositories.items():110 if opt_privkey in repository_opts and opt_pubkey in repository_opts:111 parent = os.path.join(cachedir, ext_name, minion_id, repository_name)112 if not os.path.exists(parent):113 os.makedirs(parent)114 priv_location = os.path.join(parent, 'priv.key')115 pub_location = os.path.join(parent, 'pub.key')116 # will override if already exists117 write_file(priv_location, repository_opts[opt_privkey], "600")118 write_file(pub_location, repository_opts[opt_pubkey], "644")119 repository_opts[opt_privkey_loc] = priv_location120 repository_opts[opt_pubkey_loc] = pub_location121 privgit_url = read_configuration(opt_url, repository_opts)122 privgit_branch = read_configuration(opt_branch, repository_opts)123 privgit_env = read_configuration(opt_env, repository_opts)124 privgit_root = read_configuration(opt_root, repository_opts)125 privgit_privkey = read_configuration(opt_privkey_loc, repository_opts)126 privgit_pubkey = read_configuration(opt_pubkey_loc, repository_opts)127 repo = {'{} {}'.format(privgit_branch, privgit_url): [128 {"env": privgit_env},129 {"root": privgit_root},130 {"privkey": privgit_privkey},131 {"pubkey": privgit_pubkey}]}132 log.debug("generated private git configuration: %s", repo)133 try:134 # workaround, otherwise GitFS doesn't perform fetch and "remote ref does not exist"135 local_opts = copy.deepcopy(__opts__)136 local_opts['__role'] = 'minion'137 loaded_pillar = salt.loader.pillars(local_opts, __salt__)138 ret = loaded_pillar['git'](minion_id, pillar, repo)139 except Exception as e:140 log.exception("Fatal error in privgit, for: %s %s, repository will be omitted", privgit_branch, privgit_url)...

Full Screen

Full Screen

test_configuration.py

Source:test_configuration.py Github

copy

Full Screen

1from pyups import configuration2from pyups.configuration import Configuration3from unittest.mock import patch4from unittest import mock5def __set_up_no_encryption(s3_bucket: str, repository_path) -> Configuration:6 """7 This is used by the tests to set up a configuration for a repository **without** encryption.8 This is done by mocking the user inputs and running `configuration.get_configuration`.9 Parameters10 ----------11 s3_bucket12 The name of the S3 bucket that the repository will be configured with. The back up13 copies would normally be uploaded to this bucket.14 repository_path15 The path to the repository of files to be backed up. This repository must not yet16 have been initialised (i.e. it has not been previously configured).17 Returns18 -------19 The `Configuration` that was created for the repository.20 """21 with mock.patch('builtins.input') as mock_input:22 mock_input.side_effect = [s3_bucket, "n"]23 return configuration.get_configuration(repository_path=repository_path)24def test_configuration_no_encryption(tmp_path) -> None:25 """26 This tests seting up a configuration for a repository, without encryption.27 """28 created_configuration = __set_up_no_encryption(s3_bucket="abc",29 repository_path=tmp_path)30 assert created_configuration == Configuration(s3_bucket="abc",31 encryption_password=None)32def test_read_configuration_no_encryption(tmp_path) -> None:33 """34 This tests reading back a configuration for a repository (withoouut extra encryptio) after35 it has been set up.36 """37 created_configuration = __set_up_no_encryption(s3_bucket="def",38 repository_path=tmp_path)39 read_configuration = configuration.get_configuration(40 repository_path=tmp_path)41 assert created_configuration == read_configuration42def __set_up_with_encryption(s3_bucket: str, password: str,43 repository_path) -> None:44 """45 This is used by the tests to set up a configuration for a repository **with** encryption.46 This is done by mocking the user inputs and running `configuration.get_configuration`.47 Parameters48 ----------49 s3_bucket50 The name of the S3 bucket that the repository will be configured with. The back51 up copies would normally be uploaded to this bucket.52 password53 The encryption password that will be configured for the repository.54 repository_path55 The path to the repository of files to be backed up. This repository56 **must not** yet have been initialised (i.e. it has not been previously57 configured).58 Returns59 -------60 The `Configuration` that was created for the repository.61 """62 with mock.patch('builtins.input') as mock_input:63 mock_input.side_effect = [s3_bucket, "y"]64 with mock.patch('getpass.getpass') as mock_getpass:65 mock_getpass.return_value = password66 return configuration.get_configuration(67 repository_path=repository_path)68def __read_configuration_with_encryption(password: str,69 repository_path) -> None:70 """71 This is used by the tests to read back the configuration for a repository72 configured with encryption. It is expected that the user will be prompted for73 the encryption password when reading the configuration (the method will mock74 the user input).75 Parameters76 ----------77 password78 The encryption password for the repository. This **must** match the79 password that was set during configuraton.80 repository_path81 The path to the repository of files to be backed up. This repository82 **must** have been already been configured.83 Return84 ------85 The `Configuration` that was read back from the repository.86 """87 with mock.patch('getpass.getpass') as mock_getpass:88 mock_getpass.return_value = password89 return configuration.get_configuration(repository_path=repository_path)90def test_configuration_with_encryption(tmp_path) -> None:91 """92 Tests creating the initial configuration for a repository, with encryption.93 """94 created_configuration = __set_up_with_encryption(s3_bucket="bucket",95 password="abc",96 repository_path=tmp_path)97 assert created_configuration.s3_bucket == "bucket"98 assert created_configuration.encryption_password != None99def test_read_configuration_with_encryption(tmp_path) -> None:100 """101 Tests reading back the configuration for an existing repository, with encryption.102 """103 created_configuration = __set_up_with_encryption(s3_bucket="bucket2",104 password="test",105 repository_path=tmp_path)106 read_configuration = __read_configuration_with_encryption(107 repository_path=tmp_path, password="test")108 assert created_configuration == read_configuration...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Behave automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful