Best Python code snippet using localstack_python
horizontal_shard.py
Source:horizontal_shard.py  
1# ext/horizontal_shard.py2# Copyright (C) 2005-2017 the SQLAlchemy authors and contributors3# <see AUTHORS file>4#5# This module is part of SQLAlchemy and is released under6# the MIT License: http://www.opensource.org/licenses/mit-license.php7"""Horizontal sharding support.8Defines a rudimental 'horizontal sharding' system which allows a Session to9distribute queries and persistence operations across multiple databases.10For a usage example, see the :ref:`examples_sharding` example included in11the source distribution.12"""13from .. import util14from ..orm.session import Session15from ..orm.query import Query16__all__ = ['ShardedSession', 'ShardedQuery']17class ShardedQuery(Query):18    def __init__(self, *args, **kwargs):19        super(ShardedQuery, self).__init__(*args, **kwargs)20        self.id_chooser = self.session.id_chooser21        self.query_chooser = self.session.query_chooser22        self._shard_id = None23    def set_shard(self, shard_id):24        """return a new query, limited to a single shard ID.25        all subsequent operations with the returned query will26        be against the single shard regardless of other state.27        """28        q = self._clone()29        q._shard_id = shard_id30        return q31    def _execute_and_instances(self, context):32        def iter_for_shard(shard_id):33            context.attributes['shard_id'] = shard_id34            result = self._connection_from_session(35                mapper=self._mapper_zero(),36                shard_id=shard_id).execute(37                context.statement,38                self._params)39            return self.instances(result, context)40        if self._shard_id is not None:41            return iter_for_shard(self._shard_id)42        else:43            partial = []44            for shard_id in self.query_chooser(self):45                partial.extend(iter_for_shard(shard_id))46            # if some kind of in memory 'sorting'47            # were done, this is where it would happen48            return iter(partial)49    def _get_impl(self, ident, fallback_fn):50        def _fallback(query, ident):51            if self._shard_id is not None:52                return fallback_fn(self, ident)53            else:54                ident = util.to_list(ident)55                for shard_id in self.id_chooser(self, ident):56                    q = self.set_shard(shard_id)57                    o = fallback_fn(q, ident)58                    if o is not None:59                        return o60                else:61                    return None62        return super(ShardedQuery, self)._get_impl(ident, _fallback)63class ShardedSession(Session):64    def __init__(self, shard_chooser, id_chooser, query_chooser, shards=None,65                 query_cls=ShardedQuery, **kwargs):66        """Construct a ShardedSession.67        :param shard_chooser: A callable which, passed a Mapper, a mapped68          instance, and possibly a SQL clause, returns a shard ID.  This id69          may be based off of the attributes present within the object, or on70          some round-robin scheme. If the scheme is based on a selection, it71          should set whatever state on the instance to mark it in the future as72          participating in that shard.73        :param id_chooser: A callable, passed a query and a tuple of identity74          values, which should return a list of shard ids where the ID might75          reside.  The databases will be queried in the order of this listing.76        :param query_chooser: For a given Query, returns the list of shard_ids77          where the query should be issued.  Results from all shards returned78          will be combined together into a single listing.79        :param shards: A dictionary of string shard names80          to :class:`~sqlalchemy.engine.Engine` objects.81        """82        super(ShardedSession, self).__init__(query_cls=query_cls, **kwargs)83        self.shard_chooser = shard_chooser84        self.id_chooser = id_chooser85        self.query_chooser = query_chooser86        self.__binds = {}87        self.connection_callable = self.connection88        if shards is not None:89            for k in shards:90                self.bind_shard(k, shards[k])91    def connection(self, mapper=None, instance=None, shard_id=None, **kwargs):92        if shard_id is None:93            shard_id = self.shard_chooser(mapper, instance)94        if self.transaction is not None:95            return self.transaction.connection(mapper, shard_id=shard_id)96        else:97            return self.get_bind(98                mapper,99                shard_id=shard_id,100                instance=instance101            ).contextual_connect(**kwargs)102    def get_bind(self, mapper, shard_id=None,103                 instance=None, clause=None, **kw):104        if shard_id is None:105            shard_id = self.shard_chooser(mapper, instance, clause=clause)106        return self.__binds[shard_id]107    def bind_shard(self, shard_id, bind):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
