Best Python code snippet using slash
table.py
Source:table.py  
...73        for i, player in enumerate(self.players):74            if player.position == TablePosition.SB:75                self.pot += player.bet(0.5)76                self._change_bet_to_match(0.5)77                self._write_event("%s: posts small blind $%.2f" % (player.name, SB))78            elif player.position == TablePosition.BB:79                self.pot += player.bet(1)80                self._change_bet_to_match(1)81                self.last_bet_placed_by = player82                self._write_event("%s: posts big blind $%.2f" % (player.name, BB))83        if self.hand_history_enabled:84            self._write_hole_cards()85        return self._get_observation(self.players[self.next_player_i])86    def step(self, action: Action):87        self.current_player_i = self.next_player_i88        player = self.players[self.current_player_i]89        self.current_turn += 190        if (player.all_in or player.state is not PlayerState.ACTIVE) and not self.hand_is_over:91            raise Exception("A player who is inactive or all-in was allowed to act")92        if self.first_to_act is None:93            self.first_to_act = player94        # Apply the player action95        if not (self.hand_is_over or self.street_finished):96            valid_actions = self._get_valid_actions(player)97            if not self._is_action_valid(player, action, valid_actions):98                player.punish_invalid_action()99            elif action.action_type is PlayerAction.FOLD:100                player.fold()101                self.active_players -= 1102                self._write_event("%s: folds" % player.name)103            elif action.action_type is PlayerAction.CHECK:104                player.check()105                self._write_event("%s: checks" % player.name)106            elif action.action_type is PlayerAction.CALL:107                call_size = player.call(self.bet_to_match)108                self.pot += call_size109                if player.all_in:110                    self._write_event("%s: calls $%.2f and is all-in" % (player.name, call_size * BB))111                else:112                    self._write_event("%s: calls $%.2f" % (player.name, call_size * BB))113            elif action.action_type is PlayerAction.BET:114                previous_bet_this_street = player.bet_this_street115                actual_bet_size = player.bet(np.round(action.bet_amount, 2))116                self.pot += actual_bet_size117                if self.bet_to_match == 0:118                    if player.all_in:119                        self._write_event("%s: bets $%.2f and is all-in" % (player.name, actual_bet_size * BB))120                    else:121                        self._write_event("%s: bets $%.2f" % (player.name, actual_bet_size * BB))122                else:123                    if player.all_in:124                        self._write_event("%s: raises $%.2f to $%.2f and is all-in" %125                                          (player.name,126                                           ((actual_bet_size + previous_bet_this_street) - self.bet_to_match) * BB,127                                           (actual_bet_size + previous_bet_this_street) * BB)128                                          )129                    else:130                        self._write_event("%s: raises $%.2f to $%.2f" %131                                          (player.name,132                                           ((actual_bet_size + previous_bet_this_street) - self.bet_to_match) * BB,133                                           (actual_bet_size + previous_bet_this_street) * BB)134                                          )135                self._change_bet_to_match(actual_bet_size + previous_bet_this_street)136                self.last_bet_placed_by = player137            else:138                raise Exception("Error when parsing action, make sure player action_type is PlayerAction and not int")139            should_transition_to_end = False140            players_with_actions = [p for p in self.players if p.state is PlayerState.ACTIVE if not p.all_in]141            players_who_should_act = [p for p in players_with_actions if (not p.acted_this_street or p.bet_this_street != self.bet_to_match)]142            # If the game is over, or the betting street is finished, progress the game state143            if len(players_with_actions) < 2 and len(players_who_should_act) == 0:144                amount = 0145                # If all active players are all-in, transition to the end, allowing no actions in the remaining streets146                if self.active_players > 1:147                    biggest_bet_call = max(148                        [p.bet_this_street for p in self.players149                         if p.state is PlayerState.ACTIVE if p is not self.last_bet_placed_by]150                    )151                    last_bet_this_street = 0152                    if self.last_bet_placed_by is not None:153                        last_bet_this_street = self.last_bet_placed_by.bet_this_street154                    if biggest_bet_call < last_bet_this_street:155                        amount = last_bet_this_street - biggest_bet_call156                    should_transition_to_end = True157                # If everyone else has folded, end the hand158                else:159                    self.hand_is_over = True160                    amount = self.minimum_raise161                # If there are uncalled bets, return them to the player who placed them162                if amount > 0:163                    self.pot -= amount164                    self.last_bet_placed_by.stack += amount165                    self.last_bet_placed_by.money_in_pot -= amount166                    self.last_bet_placed_by.bet_this_street -= amount167                    self._write_event(168                        "Uncalled bet ($%.2f) returned to %s" % (amount * BB, self.last_bet_placed_by.name)169                    )170                if should_transition_to_end:171                    self._street_transition(transition_to_end=True)172            # If the betting street is still active, choose next player to act173            else:174                active_players_after = [i for i in range(self.n_players) if i > self.current_player_i if175                                        self.players[i].state is PlayerState.ACTIVE if not self.players[i].all_in]176                active_players_before = [i for i in range(self.n_players) if i <= self.current_player_i if177                                         self.players[i].state is PlayerState.ACTIVE if not self.players[i].all_in]178                if len(active_players_after) > 0:179                    self.next_player_i = min(active_players_after)180                else:181                    self.next_player_i = min(active_players_before)182                next_player = self.players[self.next_player_i]183                if self.last_bet_placed_by is next_player or (self.first_to_act is next_player and self.last_bet_placed_by is None):184                    self.street_finished = True185                    if len(active_players_before) > 0:186                        self.next_player_i = min(active_players_before)187        if self.street_finished and not self.hand_is_over:188            self._street_transition()189        obs = np.zeros(self.observation_space.shape[0]) if self.hand_is_over else self._get_observation(self.players[self.next_player_i])190        rewards = np.asarray([player.get_reward() for player in sorted(self.players)])191        return obs, rewards, self.hand_is_over, {}192    def _street_transition(self, transition_to_end=False):193        transitioned = False194        if self.street == GameState.PREFLOP:195            self.cards = self.deck.draw(3)196            self._write_event("*** FLOP *** [%s %s %s]" %197                              (Card.int_to_str(self.cards[0]), Card.int_to_str(self.cards[1]),198                               Card.int_to_str(self.cards[2])))199            self.street = GameState.FLOP200            transitioned = True201        if self.street == GameState.FLOP and (not transitioned or transition_to_end):202            new = self.deck.draw(1)203            self.cards.append(new)204            self._write_event("*** TURN *** [%s %s %s] [%s]" %205                              (Card.int_to_str(self.cards[0]), Card.int_to_str(self.cards[1]),206                               Card.int_to_str(self.cards[2]), Card.int_to_str(self.cards[3])))207            self.street = GameState.TURN208            transitioned = True209        if self.street == GameState.TURN and (not transitioned or transition_to_end):210            new = self.deck.draw(1)211            self.cards.append(new)212            self._write_event("*** RIVER *** [%s %s %s %s] [%s]" %213                              (Card.int_to_str(self.cards[0]), Card.int_to_str(self.cards[1]),214                               Card.int_to_str(self.cards[2]), Card.int_to_str(self.cards[3]),215                               Card.int_to_str(self.cards[4])))216            self.street = GameState.RIVER217            transitioned = True218        if self.street == GameState.RIVER and (not transitioned or transition_to_end):219            if not self.hand_is_over:220                if self.hand_history_enabled:221                    self._write_show_down()222            self.hand_is_over = True223        self.street_finished = False224        self.last_bet_placed_by = None225        self.first_to_act = None226        self.bet_to_match = 0227        self.minimum_raise = 0228        for player in self.players:229            player.finish_street()230    def _change_bet_to_match(self, new_amount):231        self.minimum_raise = new_amount - self.bet_to_match232        self.bet_to_match = new_amount233    def _write_event(self, text):234        if self.hand_history_enabled:235            self.hand_history.append(text)236    def _history_initialize(self):237        t = time.localtime()238        self.hand_history.append("PokerStars Hand #%d: Hold'em No Limit ($%.2f/$%.2f USD) - %d/%d/%d %d:%d:%d ET" %239                            (np.random.randint(2230397, 32303976), SB, BB, t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour,240                             t.tm_min, t.tm_sec))241        self.hand_history.append("Table 'Wempe III' 6-max Seat #2 is the button")242        for i, player in enumerate(self.players):243            self.hand_history.append("Seat %d: %s ($%.2f in chips)" % (i+1, player.name, player.stack*BB))244    def _write_hole_cards(self):245        self.hand_history.append("*** HOLE CARDS ***")246        for i, player in enumerate(self.players):247            if self.track_single_player or player.identifier == 0:248                self.hand_history.append("Dealt to %s [%s %s]" %249                                    (player.name, Card.int_to_str(player.cards[0]), Card.int_to_str(player.cards[1])))250    def _write_show_down(self):251        self.hand_history.append("*** SHOW DOWN ***")252        hand_types = [self.evaluator.class_to_string(self.evaluator.get_rank_class(p.hand_rank))253                        for p in self.players if p.state is PlayerState.ACTIVE]254        for player in self.players:255            if player.state is PlayerState.ACTIVE:256                player.calculate_hand_rank(self.evaluator, self.cards)257                player_hand_type = self.evaluator.class_to_string(self.evaluator.get_rank_class(player.hand_rank))258                matches = len([m for m in hand_types if m is player_hand_type])259                multiple = matches > 1260                self.hand_history.append("%s: shows [%s %s] (%s)" %261                                    (player.name, Card.int_to_str(player.cards[0]), Card.int_to_str(player.cards[1]),262                                     pretty_print_hand(player.cards, player_hand_type, self.cards, multiple))263                                    )264    def _finish_hand(self):265        for player in self.players:266            if self.hand_history_enabled:267                if player.winnings_for_hh > 0:268                    self._write_event("%s collected $%.2f from pot" % (player.name, player.winnings_for_hh*BB))269        self._write_event("*** SUMMARY ***")270        self._write_event("Total pot $%.2f | Rake $%.2f" % (self.pot*BB, 0))271        if self.street == GameState.FLOP:272            self._write_event("Board [%s %s %s]" %273                                (Card.int_to_str(self.cards[0]), Card.int_to_str(self.cards[1]),274                                 Card.int_to_str(self.cards[2]))275                                )276        elif self.street == GameState.TURN:277            self._write_event("Board [%s %s %s %s]" %278                                (Card.int_to_str(self.cards[0]), Card.int_to_str(self.cards[1]),279                                 Card.int_to_str(self.cards[2]), Card.int_to_str(self.cards[3]))280                                )281        elif self.street == GameState.RIVER:282            self._write_event("Board [%s %s %s %s %s]" %283                                (Card.int_to_str(self.cards[0]), Card.int_to_str(self.cards[1]),284                                 Card.int_to_str(self.cards[2]), Card.int_to_str(self.cards[3]),285                                 Card.int_to_str(self.cards[4]))286                                )287        if self.hand_history_enabled and self.hand_history_location is not None:288            with open('%s' % self.hand_history_location + 'handhistory_%s.txt' % time.time(), 'w') as f:289                for row in self.hand_history:290                    f.writelines(row + '\n')291    def _distribute_pot(self):292        pot = 0293        for player in self.players:294            if player.state is not PlayerState.ACTIVE:295                pot += player.money_in_pot296                player.winnings -= player.money_in_pot297        active_players = [p for p in self.players if p.state is PlayerState.ACTIVE]298        if len(active_players) == 1:299            active_players[0].winnings += pot300            active_players[0].winnings_for_hh += pot + active_players[0].money_in_pot301            return302        for player in active_players:303            player.calculate_hand_rank(self.evaluator, self.cards)304        while True:305            min_money_in_pot = min([p.money_in_pot for p in active_players])306            for player in active_players:307                pot += min_money_in_pot308                player.money_in_pot -= min_money_in_pot309                player.winnings -= min_money_in_pot310            best_hand_rank = min([p.hand_rank for p in active_players])311            winners = [p for p in active_players if p.hand_rank == best_hand_rank]312            for winner in winners:313                winner.winnings += pot / len(winners)314                winner.winnings_for_hh += pot / len(winners)315            active_players = [p for p in active_players if p.money_in_pot > 0]316            if len(active_players) <= 1:317                if len(active_players) == 1:318                    active_players[0].winnings += active_players[0].money_in_pot319                    active_players[0].winnings_for_hh += active_players[0].money_in_pot320                break321            pot = 0322    def _is_action_valid(self, player, action, valid_actions):323        action_list, bet_range = valid_actions['actions_list'], valid_actions['bet_range']324        if action.action_type not in action_list:325            if PlayerAction.FOLD in action_list:326                player.fold()327                self.active_players -= 1328                self._write_event("%s: folds" % player.name)329                return False330            if PlayerAction.CHECK in action_list:331                player.check()332                self._write_event("%s: checks" % player.name)333                return False334            raise Exception('Something went wrong when validating actions, invalid contents of valid_actions')335        if action.action_type is PlayerAction.BET:336            if not (approx_lte(bet_range[0], action.bet_amount) and approx_lte(action.bet_amount, bet_range[1])) or approx_gt(action.bet_amount, player.stack):337                if PlayerAction.FOLD in action_list:338                    player.fold()339                    self.active_players -= 1340                    self._write_event("%s: folds" % player.name)341                else:342                    player.check()343                    self._write_event("%s: checks" % player.name)344                return False345        return True346    def _get_valid_actions(self, player):347        valid_actions = [PlayerAction.CHECK, PlayerAction.FOLD, PlayerAction.BET, PlayerAction.CALL]348        valid_bet_range = [max(self.bet_to_match + self.minimum_raise, 1), player.stack]349        others_active = [p for p in self.players if p.state is PlayerState.ACTIVE if not p.all_in if p is not player]350        if self.bet_to_match == 0:351            valid_actions.remove(PlayerAction.CALL)352            valid_actions.remove(PlayerAction.FOLD)353        if self.bet_to_match != 0:354            valid_actions.remove(PlayerAction.CHECK)355        if player.stack < max(self.bet_to_match + self.minimum_raise, 1):356            valid_bet_range = [0, 0]357            valid_actions.remove(PlayerAction.BET)...versioned.py
Source:versioned.py  
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license2"""DNS Versioned Zones."""3import collections4try:5    import threading as _threading6except ImportError:  # pragma: no cover7    import dummy_threading as _threading    # type: ignore8import dns.exception9import dns.immutable10import dns.name11import dns.rdataclass12import dns.rdatatype13import dns.rdtypes.ANY.SOA14import dns.zone15class UseTransaction(dns.exception.DNSException):16    """To alter a versioned zone, use a transaction."""17# Backwards compatibility18Node = dns.zone.VersionedNode19ImmutableNode = dns.zone.ImmutableVersionedNode20Version = dns.zone.Version21WritableVersion = dns.zone.WritableVersion22ImmutableVersion = dns.zone.ImmutableVersion23Transaction = dns.zone.Transaction24class Zone(dns.zone.Zone):25    __slots__ = ['_versions', '_versions_lock', '_write_txn',26                 '_write_waiters', '_write_event', '_pruning_policy',27                 '_readers']28    node_factory = Node29    def __init__(self, origin, rdclass=dns.rdataclass.IN, relativize=True,30                 pruning_policy=None):31        """Initialize a versioned zone object.32        *origin* is the origin of the zone.  It may be a ``dns.name.Name``,33        a ``str``, or ``None``.  If ``None``, then the zone's origin will34        be set by the first ``$ORIGIN`` line in a zone file.35        *rdclass*, an ``int``, the zone's rdata class; the default is class IN.36        *relativize*, a ``bool``, determine's whether domain names are37        relativized to the zone's origin.  The default is ``True``.38        *pruning policy*, a function taking a `Version` and returning39        a `bool`, or `None`.  Should the version be pruned?  If `None`,40        the default policy, which retains one version is used.41        """42        super().__init__(origin, rdclass, relativize)43        self._versions = collections.deque()44        self._version_lock = _threading.Lock()45        if pruning_policy is None:46            self._pruning_policy = self._default_pruning_policy47        else:48            self._pruning_policy = pruning_policy49        self._write_txn = None50        self._write_event = None51        self._write_waiters = collections.deque()52        self._readers = set()53        self._commit_version_unlocked(None,54                                      WritableVersion(self, replacement=True),55                                      origin)56    def reader(self, id=None, serial=None):  # pylint: disable=arguments-differ57        if id is not None and serial is not None:58            raise ValueError('cannot specify both id and serial')59        with self._version_lock:60            if id is not None:61                version = None62                for v in reversed(self._versions):63                    if v.id == id:64                        version = v65                        break66                if version is None:67                    raise KeyError('version not found')68            elif serial is not None:69                if self.relativize:70                    oname = dns.name.empty71                else:72                    oname = self.origin73                version = None74                for v in reversed(self._versions):75                    n = v.nodes.get(oname)76                    if n:77                        rds = n.get_rdataset(self.rdclass, dns.rdatatype.SOA)78                        if rds and rds[0].serial == serial:79                            version = v80                            break81                if version is None:82                    raise KeyError('serial not found')83            else:84                version = self._versions[-1]85            txn = Transaction(self, False, version)86            self._readers.add(txn)87            return txn88    def writer(self, replacement=False):89        event = None90        while True:91            with self._version_lock:92                # Checking event == self._write_event ensures that either93                # no one was waiting before we got lucky and found no write94                # txn, or we were the one who was waiting and got woken up.95                # This prevents "taking cuts" when creating a write txn.96                if self._write_txn is None and event == self._write_event:97                    # Creating the transaction defers version setup98                    # (i.e.  copying the nodes dictionary) until we99                    # give up the lock, so that we hold the lock as100                    # short a time as possible.  This is why we call101                    # _setup_version() below.102                    self._write_txn = Transaction(self, replacement,103                                                  make_immutable=True)104                    # give up our exclusive right to make a Transaction105                    self._write_event = None106                    break107                # Someone else is writing already, so we will have to108                # wait, but we want to do the actual wait outside the109                # lock.110                event = _threading.Event()111                self._write_waiters.append(event)112            # wait (note we gave up the lock!)113            #114            # We only wake one sleeper at a time, so it's important115            # that no event waiter can exit this method (e.g. via116            # cancellation) without returning a transaction or waking117            # someone else up.118            #119            # This is not a problem with Threading module threads as120            # they cannot be canceled, but could be an issue with trio121            # or curio tasks when we do the async version of writer().122            # I.e. we'd need to do something like:123            #124            # try:125            #     event.wait()126            # except trio.Cancelled:127            #     with self._version_lock:128            #         self._maybe_wakeup_one_waiter_unlocked()129            #     raise130            #131            event.wait()132        # Do the deferred version setup.133        self._write_txn._setup_version()134        return self._write_txn135    def _maybe_wakeup_one_waiter_unlocked(self):136        if len(self._write_waiters) > 0:137            self._write_event = self._write_waiters.popleft()138            self._write_event.set()139    # pylint: disable=unused-argument140    def _default_pruning_policy(self, zone, version):141        return True142    # pylint: enable=unused-argument143    def _prune_versions_unlocked(self):144        assert len(self._versions) > 0145        # Don't ever prune a version greater than or equal to one that146        # a reader has open.  This pins versions in memory while the147        # reader is open, and importantly lets the reader open a txn on148        # a successor version (e.g. if generating an IXFR).149        #150        # Note our definition of least_kept also ensures we do not try to151        # delete the greatest version.152        if len(self._readers) > 0:153            least_kept = min(txn.version.id for txn in self._readers)154        else:155            least_kept = self._versions[-1].id156        while self._versions[0].id < least_kept and \157              self._pruning_policy(self, self._versions[0]):158            self._versions.popleft()159    def set_max_versions(self, max_versions):160        """Set a pruning policy that retains up to the specified number161        of versions162        """163        if max_versions is not None and max_versions < 1:164            raise ValueError('max versions must be at least 1')165        if max_versions is None:166            def policy(*_):167                return False168        else:169            def policy(zone, _):170                return len(zone._versions) > max_versions171        self.set_pruning_policy(policy)172    def set_pruning_policy(self, policy):173        """Set the pruning policy for the zone.174        The *policy* function takes a `Version` and returns `True` if175        the version should be pruned, and `False` otherwise.  `None`176        may also be specified for policy, in which case the default policy177        is used.178        Pruning checking proceeds from the least version and the first179        time the function returns `False`, the checking stops.  I.e. the180        retained versions are always a consecutive sequence.181        """182        if policy is None:183            policy = self._default_pruning_policy184        with self._version_lock:185            self._pruning_policy = policy186            self._prune_versions_unlocked()187    def _end_read(self, txn):188        with self._version_lock:189            self._readers.remove(txn)190            self._prune_versions_unlocked()191    def _end_write_unlocked(self, txn):192        assert self._write_txn == txn193        self._write_txn = None194        self._maybe_wakeup_one_waiter_unlocked()195    def _end_write(self, txn):196        with self._version_lock:197            self._end_write_unlocked(txn)198    def _commit_version_unlocked(self, txn, version, origin):199        self._versions.append(version)200        self._prune_versions_unlocked()201        self.nodes = version.nodes202        if self.origin is None:203            self.origin = origin204        # txn can be None in __init__ when we make the empty version.205        if txn is not None:206            self._end_write_unlocked(txn)207    def _commit_version(self, txn, version, origin):208        with self._version_lock:209            self._commit_version_unlocked(txn, version, origin)210    def _get_next_version_id(self):211        if len(self._versions) > 0:212            id = self._versions[-1].id + 1213        else:214            id = 1215        return id216    def find_node(self, name, create=False):217        if create:218            raise UseTransaction219        return super().find_node(name)220    def delete_node(self, name):221        raise UseTransaction222    def find_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE,223                      create=False):224        if create:225            raise UseTransaction226        rdataset = super().find_rdataset(name, rdtype, covers)227        return dns.rdataset.ImmutableRdataset(rdataset)228    def get_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE,229                     create=False):230        if create:231            raise UseTransaction232        rdataset = super().get_rdataset(name, rdtype, covers)233        return dns.rdataset.ImmutableRdataset(rdataset)234    def delete_rdataset(self, name, rdtype, covers=dns.rdatatype.NONE):235        raise UseTransaction236    def replace_rdataset(self, name, replacement):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
