Best Python code snippet using autotest_python
IndicesService.py
Source:IndicesService.py  
...4from elastichq.service import ClusterService, ConnectionService, HQService5from ..globals import REQUEST_TIMEOUT6class IndicesService:7    def get_indices_stats(self, cluster_name, indices_names=None):8        connection = ConnectionService().get_connection(cluster_name)9        return connection.indices.stats(index=indices_names, request_timeout=REQUEST_TIMEOUT)10    def get_indices(self, cluster_name, index_name=None):11        connection = ConnectionService().get_connection(cluster_name)12        return connection.indices.get(index=index_name or "_all", request_timeout=REQUEST_TIMEOUT)13    def delete_indices(self, cluster_name, index_name):14        connection = ConnectionService().get_connection(cluster_name)15        return connection.indices.delete(index=index_name, request_timeout=REQUEST_TIMEOUT)16    def create_index(self, cluster_name, index_name, settings=None):17        connection = ConnectionService().get_connection(cluster_name)18        return connection.indices.create(index=index_name, body=settings, request_timeout=REQUEST_TIMEOUT)19    def open_index(self, cluster_name, index_name):20        connection = ConnectionService().get_connection(cluster_name)21        return connection.indices.open(index=index_name, request_timeout=REQUEST_TIMEOUT)22    def close_index(self, cluster_name, index_name):23        connection = ConnectionService().get_connection(cluster_name)24        return connection.indices.close(index=index_name, request_timeout=REQUEST_TIMEOUT)25    def flush_index(self, cluster_name, index_name):26        connection = ConnectionService().get_connection(cluster_name)27        return connection.indices.flush(index=index_name, request_timeout=REQUEST_TIMEOUT)28    def refresh_index(self, cluster_name, index_name):29        connection = ConnectionService().get_connection(cluster_name)30        return connection.indices.refresh(index=index_name, request_timeout=REQUEST_TIMEOUT)31    def clear_cache(self, cluster_name, index_name):32        connection = ConnectionService().get_connection(cluster_name)33        return connection.indices.clear_cache(index=index_name, request_timeout=REQUEST_TIMEOUT)34    def get_alias(self, cluster_name, index_name):35        """36        Fetches alias definitions for an index, if passed in. For now, we ignore nested data inside of the alias payload, like filter terms. 37        TODO: https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices-aliases.html#_examples_238        :param cluster_name: 39        :param index_name: 40        :return:41        """42        connection = ConnectionService().get_connection(cluster_name)43        alias_defs = connection.indices.get_alias(index=index_name, request_timeout=REQUEST_TIMEOUT)44        aliases = []45        for index_name in alias_defs:46            aliases_as_dicts = alias_defs[index_name].get('aliases', None)47            alias_keys = list(aliases_as_dicts)48            if alias_keys:49                for key in alias_keys:50                    row = {'index_name': index_name, 'alias': key}51                    aliases.append(row)52        return aliases53    def remove_alias(self, cluster_name, index_name, alias_name):54        connection = ConnectionService().get_connection(cluster_name)55        return connection.indices.delete_alias(index_name, name=alias_name)56    def create_alias(self, cluster_name, index_name, alias_name):57        connection = ConnectionService().get_connection(cluster_name)58        return connection.indices.put_alias(index_name, name=alias_name)59    def force_merge(self, cluster_name, index_name):60        connection = ConnectionService().get_connection(cluster_name)61        return connection.indices.forcemerge(index=index_name, request_timeout=REQUEST_TIMEOUT)62    def get_mapping(self, cluster_name, index_name, mapping_name=None):63        # TODO: add options here, per: https://www.elastic.co/guide/en/elasticsearch/reference/6.x/indices-get-mapping.html#indices-get-mapping64        connection = ConnectionService().get_connection(cluster_name)65        return connection.indices.get_mapping(index=index_name, doc_type=mapping_name, request_timeout=REQUEST_TIMEOUT)66    def get_indices_summary(self, cluster_name, indices_names=None):67        """68        Returns a formatted representation of one/many indices.69        :param cluster_name:70        :param indices_names:71        :return:72        """73        connection = ConnectionService().get_connection(cluster_name)74        indices_stats = connection.indices.stats(index=indices_names, request_timeout=REQUEST_TIMEOUT)75        # get shard info76        cluster_state = ClusterService().get_cluster_state(cluster_name, metric="metadata", indices=indices_names)77        state_indices = jmespath.search("metadata.indices", cluster_state)78        cat = connection.cat.indices(format='json')79        show_dot_indices = HQService().get_settings(cluster_name).get('show_dot_indices')80        indices = []81        if state_indices:82            the_indices = indices_stats.get("indices", None)83            index_keys = list(the_indices.keys())84            for key in index_keys:85                if show_dot_indices is False and key.startswith(".") is True:86                    continue87                one_index = the_indices.get(key)88                index = {"index_name": key}89                index['health'] = [x['health'] for x in cat if x['index'] == key][0]90                index['docs'] = jmespath.search("primaries.docs.count", one_index)91                index['docs_deleted'] = jmespath.search("primaries.docs.deleted", one_index)92                index['size_in_bytes'] = jmespath.search("primaries.store.size_in_bytes", one_index)93                index['fielddata'] = {94                    'memory_size_in_bytes': jmespath.search("total.fielddata.memory_size_in_bytes", one_index)}95                index_state = state_indices.get(key)96                index['settings'] = {97                    'number_of_shards': int(jmespath.search("settings.index.number_of_shards", index_state) or 0),98                    "number_of_replicas": int(jmespath.search("settings.index.number_of_replicas", index_state) or 0)}99                index['state'] = index_state.get("state", None)100                indices.append(index)101        return sorted(indices, key=lambda k: k['index_name'])102    def get_shards(self, cluster_name, index_name):103        connection = ConnectionService().get_connection(cluster_name)104        shards = connection.cat.shards(index=index_name, format='json')105        return shards106    def expunge_deleted(self, cluster_name, index_name):107        connection = ConnectionService().get_connection(cluster_name)108        try:109            return connection.indices.forcemerge(index=index_name, params={"only_expunge_deletes": 1},110                                                 request_timeout=REQUEST_TIMEOUT)111        except:  # this will time out on large indices, so ignore.112            return113    def get_indices_templates(self, cluster_name):114        connection = ConnectionService().get_connection(cluster_name)115        return connection.indices.get_template()116    def get_indices_segments(self, cluster_name):117        connection = ConnectionService().get_connection(cluster_name)118        return connection.indices.segments()119    def get_indices_shard_stores(self, cluster_name):120        connection = ConnectionService().get_connection(cluster_name)121        return connection.indices.shard_stores()122    def get_indices_recovery(self, cluster_name):123        connection = ConnectionService().get_connection(cluster_name)124        return connection.indices.recovery()125    def copy_mapping(self, cluster_name, from_index, to_index):126        # check that destination does NOT contain a mapping127        dest_mapping_exists = IndicesService().get_mapping(cluster_name, to_index)128        if bool(dest_mapping_exists.get(to_index).get('mappings', None)):129            raise BadRequest(message='Index already contains a mapping!')130        else:131            source_mapping = IndicesService().get_mapping(cluster_name, from_index)132            connection = ConnectionService().get_connection(cluster_name)133            root_mapping = source_mapping[from_index]134            doc_type = list(root_mapping['mappings'].keys())[0]135            mapping_body = root_mapping['mappings'].get(doc_type, {})136            return connection.indices.put_mapping(doc_type=doc_type, body=mapping_body, index=to_index)137    def reindex(self, cluster_name, from_index, to_index):138        body = """139        {140          "source": {141            "index": "%s"142          },143          "dest": {144            "index": "%s"145          }146        }147        """ % (from_index, to_index)148        connection = ConnectionService().get_connection(cluster_name)149        connection.reindex(body=body, wait_for_completion=False)150        return151    def get_closed_indices(self, cluster_name):152        connection = ConnectionService().get_connection(cluster_name)153        cat_indices = connection.cat.indices(format='json')154        indices = []155        if cat_indices:156            for index in cat_indices:157                if index.get('status', "").startswith('close'):158                    indices.append(index)159        return indices160    def get_deleted_indices(self, cluster_name):161        """162        Only supported in ES v5+163        :param cluster_name:164        :return:165        """166        cluster_state = ClusterService().get_cluster_state(cluster_name, metric="metadata")...PoolWithMySQL.py
Source:PoolWithMySQL.py  
...26except Exception as e:27    print(e)28    29no_of_connections = [30conn.get_connection(),31conn.get_connection(),32conn.get_connection(),33conn.get_connection(),34conn.get_connection(),35conn.get_connection(),36conn.get_connection(),37conn.get_connection(),38conn.get_connection(),39conn.get_connection(),40conn.get_connection(),41conn.get_connection(),42]43while len(no_of_connections) > conn.pool_size:44    for i in no_of_connections:45        cursor = i.cursor(dictionary = True)46        cursor.execute('select * from user_details where id = 1;')47        for row in cursor:48            print(row)49else:50    print("testing")51# if conn.pool_size :52#         print("entered 11 th connection established")53# print(conn.pool_size)54# db = conn.get_connection()55# db1 = conn.get_connection()56# db2 = conn.get_connection()57# db3 = conn.get_connection()58# db4 = conn.get_connection()59# db5 = conn.get_connection()60# db6 = conn.get_connection()61# db7 = conn.get_connection()62# db8 = conn.get_connection()63# db9 = conn.get_connection()64# db10 = conn.get_connection()65# db11 = conn.get_connection()66# print(conn.pool_name, conn.pool_size)67# conn_obj = conn.get_connection()68# if conn_obj.is_connected():69#     db_info = conn_obj.get_server_info()70#     print("Connected to MySQL database using connection pool", db_info)71#     cursor = conn_obj.cursor()72#     cursor.execute("select database();")73#     record = cursor.fetchone()...jigsaw-moritz.py
Source:jigsaw-moritz.py  
1from collections import defaultdict2def get_connection(p, d):3	return pieces[p][(d + pieces[p][-1]) % 4] 4def rotate_piece(p):5	pieces[p][-1] = (pieces[p][-1]+1)%46def rotate_connection_(p,c,d):7	for i in range(4):8		if get_connection(p,i+d) == c:9			pieces[p][-1] = (pieces[p][-1]+i)%410def get_fitting_piece(p, d):11	idx_list = connections[get_connection(p, d)]12	if idx_list[0] == p:13		return idx_list[1]14	else:15		return idx_list[0]16north = 017west = 118south = 219east = 320n = int(raw_input())21connections = defaultdict(list)22pieces = []23corner = -124error = False25for i in range(n):26	c = 027	p = list(map(int, raw_input().split(" ")))28	for j in range(4):29		connections[p[j]].append(i)30		31		if p[j] == 0 and p[(j+1)%4] == 0:32			corner = i33			34	# add current rotation35	p.append(0)36	pieces.append(p)37if corner == -1:38	print("impossible")39	exit(0)40used = [False for _ in range(len(pieces))]41current_piece = corner42while get_connection(current_piece, north) != 0 or get_connection(current_piece, west) != 0:43	rotate_piece(current_piece)44solved = []45solved.append([])46solved[-1].append(current_piece)47used[current_piece] = True48pos = 049for _ in range(n-1):50	if get_connection(current_piece, east) == 0:51		idx = get_fitting_piece(solved[-1][0], south)52		if used[idx] or current_piece == idx:53			error = True54			break55		# rotate piece56		while get_connection(idx, north) != get_connection(solved[-1][0], south):57			rotate_piece(idx)58		if get_connection(idx, west) != 0:59			error = True60			break61		solved.append([])62		solved[-1].append(idx)63		current_piece = idx64		used[idx] = True65		pos = 166	else:67		idx = get_fitting_piece(current_piece, east)68		if used[idx] or idx == current_piece:69			error = True70			break71		while get_connection(idx, west) != get_connection(current_piece, east):72			rotate_piece(idx)73		if len(solved) > 1 and (len(solved[-2]) <= pos or get_connection(idx, north) != get_connection(solved[-2][pos], south) or get_connection(idx, north) == 0):74			#print("Top does not fit!")75			error = True76			break77		solved[-1].append(idx)78		current_piece = idx79		used[idx] = True80		pos += 181test_len = len(solved[0])82for l in solved:83	if len(l) != test_len:84		error = True85if error: 86	print("impossible")87else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
