How to use replace_param method in toolium

Best Python code snippet using toolium_python

prop.py

Source:prop.py Github

copy

Full Screen

...6 edited_prop = prop7 # zookeeper8 if server.server_type == ServerType.ZOOKEEPER:9 server_id = server.server_id10 edited_prop = replace_param('dataDir', f'{server.data_dir}', edited_prop)11 edited_prop = replace_param('clientPort', f'{server.client_port}', edited_prop)12 cluster_list = generate_zookeeper_cluster_list(server_id, servers)13 edited_prop = f'{edited_prop}\n' + '\n'.join(cluster_list) + '\n'14 # kafka15 elif server.server_type == ServerType.KAFKA:16 edited_prop = replace_param('broker.id', f'{server.server_id}', edited_prop)17 edited_prop = replace_param('log.dirs', f'{server.data_dir}', edited_prop)18 edited_prop = replace_param('listeners', f'{server.listeners}', edited_prop)19 edited_prop = replace_param('advertised.listeners', f'{server.advertised_listeners}', edited_prop)20 edited_prop = replace_param('zookeeper.connect', f'{server.zookeeper_connect}', edited_prop)21 edited_prop = replace_param('confluent.metrics.reporter.bootstrap.servers',22 f'{server.metrics_reporter_bootstrap_servers}', edited_prop)23 # schema-registry24 elif server.server_type == ServerType.SCHEMA_REGISTRY:25 edited_prop = replace_param('schema.registry.group.id', f'{server.group_id}', edited_prop)26 edited_prop = replace_param('host.name', f'{server.host_name}', edited_prop)27 edited_prop = replace_param('listeners', f'{server.listeners}', edited_prop)28 edited_prop = replace_param('kafkastore.bootstrap.servers', f'{server.bootstrap_servers}', edited_prop)29 # kafka-connect30 elif server.server_type == ServerType.KAFKA_CONNECT:31 edited_prop = replace_param('group.id', f'{server.group_id}', edited_prop)32 edited_prop = replace_param('rest.host.name', f'{server.listen_address}', edited_prop)33 edited_prop = replace_param('rest.port', f'{server.listen_port}', edited_prop)34 edited_prop = replace_param('rest.advertised.host.name', f'{server.advertised_listen_address}', edited_prop)35 edited_prop = replace_param('rest.advertised.port', f'{server.advertised_listen_port}', edited_prop)36 edited_prop = replace_param('bootstrap.servers', f'{server.bootstrap_servers}', edited_prop)37 edited_prop = replace_param('key.converter.schema.registry.url',38 f'{server.key_converter_schema_registry_url}', edited_prop)39 edited_prop = replace_param('value.converter.schema.registry.url',40 f'{server.value_converter_schema_registry_url}', edited_prop)41 edited_prop = replace_param('config.storage.topic', f'{server.config_storage_topic}', edited_prop)42 edited_prop = replace_param('offset.storage.topic', f'{server.offset_storage_topic}', edited_prop)43 edited_prop = replace_param('status.storage.topic', f'{server.status_storage_topic}', edited_prop)44 edited_prop = replace_param('plugin.path', f'{server.plugin_path}', edited_prop)45 # replicator46 elif server.server_type == ServerType.REPLICATOR:47 edited_prop = replace_param('group.id', f'{server.group_id}', edited_prop)48 edited_prop = replace_param('rest.host.name', f'{server.listen_address}', edited_prop)49 edited_prop = replace_param('rest.port', f'{server.listen_port}', edited_prop)50 edited_prop = replace_param('rest.advertised.host.name', f'{server.advertised_listen_address}', edited_prop)51 edited_prop = replace_param('rest.advertised.port', f'{server.advertised_listen_port}', edited_prop)52 edited_prop = replace_param('bootstrap.servers', f'{server.bootstrap_servers}', edited_prop)53 edited_prop = replace_param('key.converter.schema.registry.url',54 f'{server.key_converter_schema_registry_url}', edited_prop)55 edited_prop = replace_param('value.converter.schema.registry.url',56 f'{server.value_converter_schema_registry_url}', edited_prop)57 edited_prop = replace_param('config.storage.topic', f'{server.config_storage_topic}', edited_prop)58 edited_prop = replace_param('offset.storage.topic', f'{server.offset_storage_topic}', edited_prop)59 edited_prop = replace_param('status.storage.topic', f'{server.status_storage_topic}', edited_prop)60 edited_prop = replace_param('plugin.path', f'{server.plugin_path}', edited_prop)61 # kafka-rest62 elif server.server_type == ServerType.KAFKA_REST:63 edited_prop = replace_param('id', f'{server.server_id}', edited_prop)64 edited_prop = replace_param('listeners', f'{server.listeners}', edited_prop)65 edited_prop = replace_param('advertised.listeners', f'{server.advertised_listeners}', edited_prop)66 edited_prop = replace_param('bootstrap.servers', f'{server.bootstrap_servers}', edited_prop)67 edited_prop = replace_param('schema.registry.url', f'{server.schema_registry_url}', edited_prop)68 # ksqldb69 elif server.server_type == ServerType.KSQLDB:70 edited_prop = replace_param('ksql.service.id', f'{server.group_id}', edited_prop)71 edited_prop = replace_param('ksql.streams.state.dir', f'{server.data_dir}', edited_prop)72 edited_prop = replace_param('listeners', f'{server.listeners}', edited_prop)73 edited_prop = replace_param('ksql.advertised.listener', f'{server.advertised_listener}', edited_prop)74 edited_prop = replace_param('bootstrap.servers', f'{server.bootstrap_servers}', edited_prop)75 edited_prop = replace_param('ksql.schema.registry.url', f'{server.schema_registry_url}', edited_prop)76 write_file(f'output/properties/{server.file.properties}', edited_prop)77def create_control_center_prop_file(server, prop, connect_servers, replicator_servers, ksqldb_servers):78 edited_prop = prop79 # control-center80 if server.server_type == ServerType.CONTROL_CENTER:81 edited_prop = replace_param('confluent.controlcenter.id', f'{server.server_id}', edited_prop)82 edited_prop = replace_param('confluent.controlcenter.data.dir', f'{server.data_dir}', edited_prop)83 edited_prop = replace_param('bootstrap.servers', f'{server.bootstrap_servers}', edited_prop)84 edited_prop = replace_param('zookeeper.connect', f'{server.zookeeper_connect}', edited_prop)85 edited_prop = replace_param('confluent.controlcenter.schema.registry.url',86 f'{server.schema_registry_url}', edited_prop)87 # kafka-connect and replicator clusters88 connect_dict = get_sub_cluster_domain_url_dict(connect_servers)89 replicator_dict = get_sub_cluster_domain_url_dict(replicator_servers)90 connect_and_replicator_dict = {**replicator_dict, **connect_dict}91 for gid, urls in connect_and_replicator_dict.items():92 edited_prop = append_param(f'confluent.controlcenter.connect.{gid}.cluster', f'{urls}',93 f'### kafka-connect', edited_prop)94 # kafka-rest cluster95 edited_prop = replace_param('confluent.controlcenter.streams.cprest.url',96 f'{server.kafka_rest_url}', edited_prop)97 # ksqldb cluster98 ksqldb_group_list = get_unique_cluster_list(ksqldb_servers)99 ksqldb_domain_dict = get_sub_cluster_domain_url_dict(ksqldb_servers)100 ksqldb_address_dict = get_sub_cluster_address_url_dict(ksqldb_servers)101 for group in ksqldb_group_list:102 urls = ksqldb_domain_dict.get(group)103 ad_urls = ksqldb_address_dict.get(group)104 edited_prop = append_param(f'confluent.controlcenter.ksql.{group}.advertised.url', f'{ad_urls}',105 f'### ksqldb', edited_prop)106 edited_prop = append_param(f'confluent.controlcenter.ksql.{group}.url', f'{urls}',107 f'### ksqldb', edited_prop)...

Full Screen

Full Screen

weewx_mod.py

Source:weewx_mod.py Github

copy

Full Screen

...60 61 return indent, param, value, comment62 63 64 def replace_param(self, lines, keywd, p, old_val, new_val, limit=1):65 ''' Replace a parameter's value with a new one, using optional match '''66 def assemble(indent, param, value, comment):67 ''' Helper function for line reassembly '''68 return indent + param + ' = ' + value + comment + '\n'69 replaced = 070 i = 071 kwd_found = False72 while replaced < limit and i < len(lines):73 line = lines[i]74 if kwd_found == True: # Found the section with our keyword75 if line.find(p) >= 0 and line.find('=') >= 0:76 indent, param, value, comment = self.splitup(line)77 if old_val != '':78 if value.find(old_val) >= 0:79 lines[i] = assemble(indent, param, str(new_val), comment)80 replaced += 181 else:82 lines[i] = assemble(indent, param, str(new_val), comment)83 replaced += 184 elif line.find(keywd) >= 0:85 kwd_found = True86 i += 187 return replaced88 # Weewx station type lookup from driver89 def weewx_driver_lookup(self, station_type):90 # Find station driver by key (made case-insensitive)91 types = [d.upper() for d in self.config_dict.keys()]92 if station_type.upper() in types:93 idx = types.index(station_type.upper())94 return self.config_dict.values()[idx]95 else:96 return None97 def set_latlon(self, lat, lon):98 ''' Set values for lat/lon in class obj '''99 def check_val(val, extents):100 if val >= extents[0] and val <= extents[1]:101 return True102 return False103 # Check values prior to setting (ignore if out of range)104 if lat != "" and lon != "":105 if check_val(float(lat), [-90.0, 90.0]):106 self.pws_lat = float(lat)107 108 if check_val(float(lon), [-180.0, 180.0]):109 self.pws_lon = float(lon)110 111 def set_wu_cfg(self, wu_id, wu_pwd, rpdf='False', loc_str=''):112 ''' ''' 113 self.wu_station_id = wu_id114 self.wu_pwd = wu_pwd115 if rpdf == 'True': self.wu_rapidfire = 'True'116 else: self.wu_rapidfire = 'False'117 self.wu_location = loc_str118 def run_wee_config(self):119 ''' Update weewx config file using wee_conf utility (run 1st)'''120 121 cmd = [self.wee_config_script, 122 "--reconfigure", 123 "--driver=%s" % str(self.driver),124 "--longitude=%s" % str(self.pws_lon),125 "--latitude=%s" % str(self.pws_lat),126 "--location=%s" % str(self.wu_location),127 "--no-prompt",128 "--no-backup"]129 retcode = subprocess.call(cmd)130 if retcode != 0:131 self.logger.error("subprocess cmd '%s' returned %s\n" % (cmd,retcode))132 def update_config_file(self):133 ''' Update the weewx config file manually (run 2nd) ''' 134 # Grab all lines from input file and store in list135 lines = []136 with open(self.config_file, 'r') as infile:137 line = None138 while line != "":139 line = infile.readline()140 lines.append(line)141 # Modify other weewx station params manually (wee_config doesn't do these)142 self.replace_param(lines, '[Station]', 'model', '', self.pws_model)143 self.replace_param(lines, '[[Wunderground]]', 'enable', '', 'true')144 self.replace_param(lines, '[[Wunderground]]', 'station', '', self.wu_station_id)145 self.replace_param(lines, '[[Wunderground]]', 'password', '', '"'+self.wu_pwd+'"')146 self.replace_param(lines, '[[Wunderground]]', 'rapidfire', '', self.wu_rapidfire)147 self.replace_param(lines, '[[Wunderground]]', 'location', '', self.wu_location)148 ## Write out new config file (replace the input copy)149 with open(self.config_file, 'w') as outfile:150 for line in lines:151 outfile.write(line)152 self.logger.debug("%s Station params modified\n" % (self.config_file))153 def update_all(self):154 ''' '''155 # Run wee_config to modify the config file for us, where it can156 self.run_wee_config()157 # With the config file properly updated, we can modify it with the 158 # remaining WU-specific parameters159 self.update_config_file()160 161#### Main...

Full Screen

Full Screen

replace_prarm.py

Source:replace_prarm.py Github

copy

Full Screen

1class ReplaceParam:2 def replace_param(self,request_data,case_titledata):3 """将请求参数中的变量进行替换"""4 if isinstance(request_data, dict):5 for key, value in request_data.items():6 if "$" in str(value):7 if isinstance(value,str):8 # print("value is str",value)9 request_data[key] = case_titledata[value[2:-1]]10 else:11 # print("value is not str", value)12 self.replace_param(value,case_titledata)13 if isinstance(request_data,list):14 for i in request_data:15 if "$" in str(i):16 if isinstance(i,dict):17 # print("i is dict", i)18 self.replace_param(i,case_titledata)19 elif isinstance(i,list):20 # print("i is list", i)21 self.replace_param(i,case_titledata)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run toolium automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful