Best Python code snippet using gabbi_python
config_helper_test.py
Source:config_helper_test.py  
...55    with tempfile.NamedTemporaryFile('w') as f:56      f.write(SCHEMA.encode('utf_8'))57      f.flush()58      schema = config_helper.Schema.load_yaml_file(f.name)59      schema_from_str = config_helper.Schema.load_yaml(SCHEMA)60      self.assertEqual(schema.properties, schema_from_str.properties)61      self.assertEqual(schema.required, schema_from_str.required)62  def test_required(self):63    schema = config_helper.Schema.load_yaml(SCHEMA)64    self.assertEqual({'propertyString', 'propertyPassword'},65                     set(schema.required))66  def test_bad_required(self):67    schema_yaml = """68                  properties:69                    propertyA:70                      type: string71                  required:72                  - propertyA73                  - propertyB74                  - propertyC75                  """76    self.assertRaisesRegexp(config_helper.InvalidSchema,77                            r'propertyB, propertyC',78                            config_helper.Schema.load_yaml,79                            schema_yaml)80  def test_types_and_defaults(self):81    schema = config_helper.Schema.load_yaml(SCHEMA)82    self.assertEqual(83        {'propertyString',84         'propertyStringWithDefault',85         'propertyInt',86         'propertyIntWithDefault',87         'propertyInteger',88         'propertyIntegerWithDefault',89         'propertyNumber',90         'propertyNumberWithDefault',91         'propertyBoolean',92         'propertyBooleanWithDefault',93         'propertyPassword'},94        set(schema.properties))95    self.assertEqual(str,96                     schema.properties['propertyString'].type)97    self.assertIsNone(schema.properties['propertyString'].default)98    self.assertEqual(str,99                     schema.properties['propertyStringWithDefault'].type)100    self.assertEqual('DefaultString',101                     schema.properties['propertyStringWithDefault'].default)102    self.assertEqual(int, schema.properties['propertyInt'].type)103    self.assertIsNone(schema.properties['propertyInt'].default)104    self.assertEqual(int,105                     schema.properties['propertyIntWithDefault'].type)106    self.assertEqual(3,107                     schema.properties['propertyIntWithDefault'].default)108    self.assertEqual(int,109                     schema.properties['propertyInteger'].type)110    self.assertIsNone(schema.properties['propertyInteger'].default)111    self.assertEqual(int,112                     schema.properties['propertyIntegerWithDefault'].type)113    self.assertEqual(6,114                     schema.properties['propertyIntegerWithDefault'].default)115    self.assertEqual(float, schema.properties['propertyNumber'].type)116    self.assertIsNone(schema.properties['propertyNumber'].default)117    self.assertEqual(float,118                     schema.properties['propertyNumberWithDefault'].type)119    self.assertEqual(1.0,120                     schema.properties['propertyNumberWithDefault'].default)121    self.assertEqual(bool,122                     schema.properties['propertyBoolean'].type)123    self.assertIsNone(schema.properties['propertyBoolean'].default)124    self.assertEqual(bool,125                     schema.properties['propertyBooleanWithDefault'].type)126    self.assertEqual(False,127                     schema.properties['propertyBooleanWithDefault'].default)128    self.assertEqual(str, schema.properties['propertyPassword'].type)129    self.assertIsNone(schema.properties['propertyPassword'].default)130    self.assertEqual('GENERATED_PASSWORD',131                     schema.properties['propertyPassword'].xtype)132  def test_invalid_name(self):133    self.assertRaises(134        config_helper.InvalidSchema,135        lambda: config_helper.Schema.load_yaml(136        """137        properties:138          bad/name:139            type: string140        """))141  def test_required(self):142    schema = config_helper.Schema.load_yaml(SCHEMA)143    self.assertTrue(schema.properties['propertyString'].required)144    self.assertTrue(schema.properties['propertyPassword'].required)145    self.assertFalse(schema.properties['propertyInt'].required)146    self.assertFalse(schema.properties['propertyNumberWithDefault'].required)147  def test_schema_properties_matching(self):148    schema = config_helper.Schema.load_yaml(SCHEMA)149    self.assertEqual([schema.properties['propertyPassword']],150                     schema.properties_matching({151                         'x-google-marketplace': {152                             'type':153                             'GENERATED_PASSWORD'154                         }155                     }))156    self.assertEqual([schema.properties['propertyInt'],157                      schema.properties['propertyIntWithDefault']],158                     schema.properties_matching({159                         'type': 'int',160                     }))161  def test_name_type(self):162    schema = config_helper.Schema.load_yaml(163        """164        properties:165          n:166            type: string167            x-google-marketplace:168              type: NAME169        """)170    self.assertIsNotNone(schema.properties['n'])171  def test_namespace_type(self):172    schema = config_helper.Schema.load_yaml(173        """174        properties:175          ns:176            type: string177            x-google-marketplace:178              type: NAMESPACE179        """)180    self.assertIsNotNone(schema.properties['ns'])181  def test_image_type(self):182    schema = config_helper.Schema.load_yaml(183        """184        properties:185          i:186            type: string187            x-google-marketplace:188              type: IMAGE189        """)190    self.assertIsNotNone(schema.properties['i'])191  def test_password(self):192    schema = config_helper.Schema.load_yaml(193        """194        properties:195          pw:196            type: string197            x-google-marketplace:198              type: GENERATED_PASSWORD199        """)200    self.assertEqual(10, schema.properties['pw'].password.length)201    self.assertEqual(False, schema.properties['pw'].password.include_symbols)202    self.assertEqual(True, schema.properties['pw'].password.base64)203    schema = config_helper.Schema.load_yaml(204        """205        properties:206          pw:207            type: string208            x-google-marketplace:209              type: GENERATED_PASSWORD210              generatedPassword:211                length: 5212                includeSymbols: true213                base64: false214        """)215    self.assertEqual(5, schema.properties['pw'].password.length)216    self.assertEqual(True, schema.properties['pw'].password.include_symbols)217    self.assertEqual(False, schema.properties['pw'].password.base64)218  def test_int_type(self):219    schema = config_helper.Schema.load_yaml(220        """221        properties:222          pi:223            type: int224        """)225    self.assertEqual(5, schema.properties['pi'].str_to_type('5'))226  def test_number_type(self):227    schema = config_helper.Schema.load_yaml(228        """229        properties:230          pn:231            type: number232        """)233    self.assertEqual(5.2, schema.properties['pn'].str_to_type('5.2'))234  def test_boolean_type(self):235    schema = config_helper.Schema.load_yaml(236        """237        properties:238          pb:239            type: boolean240        """)241    self.assertEqual(True, schema.properties['pb'].str_to_type('true'))242    self.assertEqual(True, schema.properties['pb'].str_to_type('True'))243    self.assertEqual(True, schema.properties['pb'].str_to_type('yes'))244    self.assertEqual(True, schema.properties['pb'].str_to_type('Yes'))245    self.assertEqual(False, schema.properties['pb'].str_to_type('false'))246    self.assertEqual(False, schema.properties['pb'].str_to_type('False'))247    self.assertEqual(False, schema.properties['pb'].str_to_type('no'))248    self.assertEqual(False, schema.properties['pb'].str_to_type('No'))249    self.assertRaises(250        config_helper.InvalidValue,251        lambda: schema.properties['pb'].str_to_type('bad'))252  def test_invalid_default_type(self):253    self.assertRaises(254        config_helper.InvalidSchema,255        lambda: config_helper.Schema.load_yaml(256            """257            properties:258              pn:259                type: number260                default: abc261            """))262  def test_property_matches_definition(self):263    schema = config_helper.Schema.load_yaml(264        """265        properties:266          propertyInt:267            type: int268          propertyPassword:269            type: string270            x-google-marketplace:271              type: GENERATED_PASSWORD272        """)273    self.assertTrue(schema.properties['propertyInt'].matches_definition(274        {'name': 'propertyInt'}))275    self.assertFalse(schema.properties['propertyInt'].matches_definition(276        {'name': 'propertyPassword'}))277    self.assertTrue(schema.properties['propertyInt'].matches_definition(278        {'type': 'int'}))279    self.assertFalse(schema.properties['propertyInt'].matches_definition(280        {'type': 'string'}))281    self.assertFalse(schema.properties['propertyInt'].matches_definition(282        {'x-google-marketplace': {'type': 'GENERATED_PASSWORD'}}))283    self.assertTrue(schema.properties['propertyPassword'].matches_definition(284        {'x-google-marketplace': {'type': 'GENERATED_PASSWORD'}}))285    self.assertTrue(schema.properties['propertyPassword'].matches_definition(286        {'type': 'string',287         'x-google-marketplace': {288           'type': 'GENERATED_PASSWORD'289         }}))290  def test_defaults_bad_type(self):291    self.assertRaises(292        config_helper.InvalidSchema,293        lambda: config_helper.Schema.load_yaml(294            """295            properties:296              p1:297                type: string298                default: 10299            """))300  def test_service_account(self):301    schema = config_helper.Schema.load_yaml(302        """303        properties:304          sa:305            type: string306            x-google-marketplace:307              type: SERVICE_ACCOUNT308              serviceAccount:309                roles:310                - type: ClusterRole311                  rulesType: PREDEFINED312                  rulesFromRoleName: cluster-admin313                - type: ClusterRole314                  rulesType: PREDEFINED315                  rulesFromRoleName: admin316                - type: Role317                  rulesType: PREDEFINED318                  rulesFromRoleName: edit319                - type: Role320                  rulesType: PREDEFINED321                  rulesFromRoleName: view322                - type: ClusterRole323                  rulesType: CUSTOM324                  rules:325                  - apiGroups: ['v1']326                    resources: ['Secret']327                    verbs: ['*']328                  - apiGroups: ['v1']329                    resources: ['ConfigMap']330                    verbs: ['*']331                - type: Role332                  rulesType: CUSTOM333                  rules:334                  - apiGroups: ['apps/v1']335                    resources: ['Deployment']336                    verbs: ['*']337                  - apiGroups: ['apps/v1']338                    resources: ['StatefulSet']339                    verbs: ['*']340        """)341    sa = schema.properties['sa'].service_account342    self.assertIsNotNone(sa)343    self.assertListEqual(['cluster-admin', 'admin'],344                         sa.predefined_cluster_roles())345    self.assertListEqual(['edit', 'view'],346                         sa.predefined_roles())347    self.assertListEqual(348        [349            [{'apiGroups': ['v1'],350              'resources': ['Secret'],351              'verbs': ['*']},352             {'apiGroups': ['v1'],353              'resources': ['ConfigMap'],354              'verbs': ['*']},355            ]356        ],357        sa.custom_cluster_role_rules())358    self.assertListEqual(359        [360            [{'apiGroups': ['apps/v1'],361              'resources': ['Deployment'],362              'verbs': ['*']},363             {'apiGroups': ['apps/v1'],364              'resources': ['StatefulSet'],365              'verbs': ['*']},366            ]367        ],368        sa.custom_role_rules())369  def test_storage_class(self):370    schema = config_helper.Schema.load_yaml(371        """372        properties:373          sc:374            type: string375            x-google-marketplace:376              type: STORAGE_CLASS377              storageClass:378                type: SSD379        """)380    self.assertIsNotNone(schema.properties['sc'].storage_class)381    sc = schema.properties['sc'].storage_class382    self.assertTrue(sc.ssd)383  def test_reporting_secret(self):384    schema = config_helper.Schema.load_yaml(385        """386        properties:387          rs:388            type: string389            x-google-marketplace:390              type: REPORTING_SECRET391        """)392    self.assertIsNotNone(schema.properties['rs'].reporting_secret)393  def test_unknown_type(self):394    self.assertRaises(395        config_helper.InvalidSchema,396        lambda: config_helper.Schema.load_yaml(397        """398        properties:399          unk:400            type: string401            x-google-marketplace:402              type: UNKNOWN403        """))404if __name__ == 'main':...init_demo.py
Source:init_demo.py  
...17from pygenie.client import Genie18from pygenie.conf import GenieConf19logging.basicConfig(level=logging.WARNING)20LOGGER = logging.getLogger(__name__)21def load_yaml(yaml_file):22    with open(yaml_file) as _file:23        return yaml.load(_file)24genie_conf = GenieConf()25genie_conf.genie.url = "http://genie:8080"26genie = Genie(genie_conf)27hadoop_application = load_yaml("applications/hadoop271.yml")28hadoop_application_id = genie.create_application(hadoop_application)29LOGGER.warn("Created Hadoop 2.7.1 application with id = [%s]" % hadoop_application_id)30spark_163_application = load_yaml("applications/spark163.yml")31spark_163_application_id = genie.create_application(spark_163_application)32LOGGER.warn("Created Spark 1.6.3 application with id = [%s]" % spark_163_application_id)33spark_201_application = load_yaml("applications/spark201.yml")34spark_201_application_id = genie.create_application(spark_201_application)35LOGGER.warn("Created Spark 2.0.1 application with id = [%s]" % spark_201_application_id)36hadoop_command = load_yaml("commands/hadoop271.yml")37hadoop_command_id = genie.create_command(hadoop_command)38LOGGER.warn("Created Hadoop command with id = [%s]" % hadoop_command_id)39hdfs_command = load_yaml("commands/hdfs271.yml")40hdfs_command_id = genie.create_command(hdfs_command)41LOGGER.warn("Created HDFS command with id = [%s]" % hdfs_command_id)42yarn_command = load_yaml("commands/yarn271.yml")43yarn_command_id = genie.create_command(yarn_command)44LOGGER.warn("Created Yarn command with id = [%s]" % yarn_command_id)45spark_163_shell_command = load_yaml("commands/sparkShell163.yml")46spark_163_shell_command_id = genie.create_command(spark_163_shell_command)47LOGGER.warn("Created Spark 1.6.3 Shell command with id = [%s]" % spark_163_shell_command_id)48spark_163_submit_command = load_yaml("commands/sparkSubmit163.yml")49spark_163_submit_command_id = genie.create_command(spark_163_submit_command)50LOGGER.warn("Created Spark 1.6.3 Submit command with id = [%s]" % spark_163_submit_command_id)51spark_201_shell_command = load_yaml("commands/sparkShell201.yml")52spark_201_shell_command_id = genie.create_command(spark_201_shell_command)53LOGGER.warn("Created Spark 2.0.1 Shell command with id = [%s]" % spark_201_shell_command_id)54spark_201_submit_command = load_yaml("commands/sparkSubmit201.yml")55spark_201_submit_command_id = genie.create_command(spark_201_submit_command)56LOGGER.warn("Created Spark 2.0.1 Submit command with id = [%s]" % spark_201_submit_command_id)57genie.set_application_for_command(hadoop_command_id, [hadoop_application_id])58LOGGER.warn("Set applications for Hadoop command to = [%s]" % hadoop_application_id)59genie.set_application_for_command(hdfs_command_id, [hadoop_application_id])60LOGGER.warn("Set applications for HDFS command to = [[%s]]" % hadoop_application_id)61genie.set_application_for_command(yarn_command_id, [hadoop_application_id])62LOGGER.warn("Set applications for Yarn command to = [[%s]]" % hadoop_application_id)63genie.set_application_for_command(spark_163_shell_command_id, [hadoop_application_id, spark_163_application_id])64LOGGER.warn("Set applications for Spark 1.6.3 Shell command to = [%s]" %65            [hadoop_application_id, spark_163_application_id])66genie.set_application_for_command(spark_163_submit_command_id, [hadoop_application_id, spark_163_application_id])67LOGGER.warn("Set applications for Spark 1.6.3 Submit command to = [%s]" %68            [hadoop_application_id, spark_163_application_id])69genie.set_application_for_command(spark_201_shell_command_id, [hadoop_application_id, spark_201_application_id])70LOGGER.warn("Set applications for Spark 2.0.1 Shell command to = [%s]" %71            [hadoop_application_id, spark_201_application_id])72genie.set_application_for_command(spark_201_submit_command_id, [hadoop_application_id, spark_201_application_id])73LOGGER.warn("Set applications for Spark 2.0.1 Submit command to = [%s]" %74            [hadoop_application_id, spark_201_application_id])75prod_cluster = load_yaml("clusters/prod.yml")76prod_cluster_id = genie.create_cluster(prod_cluster)77LOGGER.warn("Created prod cluster with id = [%s]" % prod_cluster_id)78test_cluster = load_yaml("clusters/test.yml")79test_cluster_id = genie.create_cluster(test_cluster)80LOGGER.warn("Created test cluster with id = [%s]" % test_cluster_id)81genie.set_commands_for_cluster(82    prod_cluster_id,83    [hadoop_command_id, hdfs_command_id, yarn_command_id, spark_163_shell_command_id, spark_201_shell_command_id,84     spark_163_submit_command_id, spark_201_submit_command_id]85)86LOGGER.warn("Added all commands to the prod cluster with id = [%s]" % prod_cluster_id)87genie.set_commands_for_cluster(88    test_cluster_id,89    [hadoop_command_id, hdfs_command_id, yarn_command_id, spark_163_shell_command_id, spark_201_shell_command_id,90     spark_163_submit_command_id, spark_201_submit_command_id]91)92LOGGER.warn("Added all commands to the test cluster with id = [%s]" % test_cluster_id)Environment.py
Source:Environment.py  
2import os3class Environment:4    """Environmental variable library"""5    def get_service_host(self):6        service_yaml = self.load_yaml('service')7        return service_yaml['host']8    def get_use_ssl(self):9        service_yaml = self.load_yaml('service')10        return service_yaml['use_ssl']11    def get_service_port(self):12        service_yaml = self.load_yaml('service')13        return service_yaml['port_webservice']14    def get_server_port(self):15        service_yaml = self.load_yaml('service')16        return service_yaml['port_webserver']17    def get_service_enable_git(self):18        service_yaml = self.load_yaml('service')19        return service_yaml['enable_git'] == 120    def get_service_shout_out_liftime_days(self):21        service_yaml = self.load_yaml('service')22        return service_yaml['shout_out_liftime_days']23    def get_service_deleted_jira_boards(self):24        service_yaml = self.load_yaml('service')25        return service_yaml['deleted_jira_boards']26    def get_path_log(self):27        path_yaml = self.load_yaml('path')28        return path_yaml['log']29    def get_path_clf(self):30        path_yaml = self.load_yaml('path')31        return path_yaml['clf']32    def get_path_sync_state(self):33        path_yaml = self.load_yaml('path')34        return path_yaml['sync_state']35    def get_endpoint_git_projects(self):36        service_yaml = self.load_yaml('endpoint')37        return service_yaml['git_projects']38    def get_endpoint_git_commits(self):39        service_yaml = self.load_yaml('endpoint')40        return service_yaml['git_commits']41    def get_endpoint_git_private_token(self):42        service_yaml = self.load_yaml('endpoint')43        return service_yaml['git_private_token']44    def get_endpoint_confluence_user(self):45        service_yaml = self.load_yaml('endpoint')46        return service_yaml['confluence_user']47    def get_endpoint_confluence_password(self):48        service_yaml = self.load_yaml('endpoint')49        return service_yaml['confluence_password']50    def get_endpoint_confluence_host(self):51        service_yaml = self.load_yaml('endpoint')52        return service_yaml['confluence_host']53    def get_endpoint_jira_user(self):54        service_yaml = self.load_yaml('endpoint')55        return service_yaml['jira_user']56    def get_endpoint_jira_password(self):57        service_yaml = self.load_yaml('endpoint')58        return service_yaml['jira_password']59    def get_endpoint_jira_host(self):60        service_yaml = self.load_yaml('endpoint')61        return service_yaml['jira_host']62    def get_endpoint_mongo_db_cloud(self):63        service_yaml = self.load_yaml('endpoint')64        return service_yaml['mongo_db_cloud']65    def load_yaml(self, name):66        path = os.path.join("env", f"{name}.yaml")67        file = open(path, "r", encoding='utf8')68        return yaml.load(file, Loader=yaml.FullLoader)69    def store_yaml(self, name, data):70        path = os.path.join("env", f"{name}.yaml")71        file = open(path, "w", encoding='utf8')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
