Best Python code snippet using autotest_python
client.py
Source:client.py  
1#!/usr/local/bin/python32import sys3import socket4import queue5import threading6class Client:7    def __init__(self, id, configFile):8        self.id = id9        self.config_file = configFile10        self.received_last = True11        self.begin_flag = False12        self.resend_abort = False13        self.message_to_user = None14        self.force_abort = False15        self.my_lock = threading.Lock()16        self.all_servers = {}17        self.all_sockets = {}18        self.server_involved = {}19        self.commit_ok = 020        self.abort_ok = 021        self.output_queue = queue.Queue()22        self.transactions_queue = queue.Queue()23        self.response_buffer = queue.Queue()24        self.current_trans_id = 025        self.current_response_id = -126    def read_config(self):27        file = open(self.config_file, 'r')28        lines = file.readlines()29        for line in lines:30            line = line.strip()31            branch, ip, port = line.split()32            self.all_servers[branch] = [ip, port]33    def check_and_send(self):34        while True:35            if (not self.transactions_queue.empty()) and self.transactions_queue.queue[0] == "ABORT":36                self.my_lock.acquire()37                msg = str(self.id) + " " + self.transactions_queue.get()38                for server in self.all_servers:39                    msg = "{:<256}".format(msg)40                    res = self.all_sockets[server].send(msg.encode())41                self.received_last = False42                self.force_abort = False43                self.my_lock.release()44                continue45            self.my_lock.acquire()46            check_received = self.received_last47            self.my_lock.release()48            if (check_received):49                if not self.transactions_queue.empty() and self.transactions_queue.queue[0] == "BEGIN":50                    self.my_lock.acquire()51                    self.begin_flag = True52                    self.force_abort = False53                    self.transactions_queue.get()54                    self.my_lock.release()55                    print("OK")56                if not self.transactions_queue.empty():57                    trans = self.transactions_queue.get()58                    #print("trans: ", trans)59                    self.my_lock.acquire()60                    check_begin = self.begin_flag61                    self.my_lock.release()62                    #print("begin: ", self.begin_flag)63                    if check_begin:64                        trans_type, *element = trans.strip().split()65                        #print("trans_type: ", trans_type)66                        self.current_trans_id += 167                        if trans_type == "DEPOSIT":68                            server_account, amount = element69                            server = server_account.split(".")[0]70                            account = server_account.split(".")[1]71                            msg = str(self.id) + " " + trans72                            msg = "{:<256}".format(msg)73                            #print("78: ", msg)74                            res = self.all_sockets[server].send(msg.encode())75                            self.my_lock.acquire()76                            self.received_last = False77                            self.my_lock.release()78                        elif trans_type == "WITHDRAW":79                            server_account, amount = element80                            server = server_account.split(".")[0]81                            account = server_account.split(".")[1]82                            msg = str(self.id) + " " + trans83                            msg = "{:<256}".format(msg)84                            #print("90: ", msg)85                            res = self.all_sockets[server].send(msg.encode())86                            self.my_lock.acquire()87                            self.received_last = False88                            self.my_lock.release()89                        elif trans_type == "BALANCE":90                            #print("element: ", element)91                            server = element[0].split(".")[0]92                            account = element[0].split(".")[1]93                            #print("server: ", server)94                            msg = str(self.id) + " " + trans95                            msg = "{:<256}".format(msg)96                            #print("103:", msg)97                            res = self.all_sockets[server].send(msg.encode())98                            self.my_lock.acquire()99                            self.received_last = False100                            self.my_lock.release()101                        elif trans_type == "COMMIT":102                            self.my_lock.acquire()103                            for server in self.all_servers:104                                msg = str(self.id) + " " + trans105                                msg = "{:<256}".format(msg)106                                #print("114: ", msg)107                                res = self.all_sockets[server].send(msg.encode())108                            self.received_last = False109                            self.my_lock.release()110                        elif trans_type == "ABORT":111                            self.my_lock.acquire()112                            for server in self.all_servers:113                                msg = str(self.id) + " " + trans114                                msg = "{:<256}".format(msg)115                                #print("124: ", msg)116                                res = self.all_sockets[server].send(msg.encode())117                            self.received_last = False118                            self.my_lock.release()119    def handle_user(self):120        for line in sys.stdin:121            if len(line.strip()) > 0:122                trans_type, *element = line.strip().split()123                self.transactions_queue.put(line.strip())124    def get_response(self):125        #print("start get response")126        while True:127            for server in self.all_sockets:128                my_socket = self.all_sockets[server]129                try:130                    response = my_socket.recv(256, socket.MSG_DONTWAIT).decode()131                    response = response.strip()132                except BlockingIOError as e:133                    response = None134                if response and len(response) != 0:135                    #print("receive response: ", response, ",", server)136                    msg_type = response.strip()137                    if msg_type == "NOT FOUND, ABORTED":138                        self.message_to_user = "NOT FOUND, ABORTED"139                        self.my_lock.acquire()140                        self.begin_flag = False141                        self.force_abort = True142                        to_send = "ABORT"143                        for server in self.all_servers:144                            msg = str(self.id) + " " + to_send145                            msg = "{:<256}".format(msg)146                            #print(msg)147                            res = self.all_sockets[server].send(msg.encode())148                        self.my_lock.release()149                    elif msg_type == "COMMIT ABORTED":150                        self.message_to_user = "COMMIT ABORTED"151                        self.my_lock.acquire()152                        self.begin_flag = False153                        self.force_abort = True154                        to_send = "ABORT"155                        for server in self.all_servers:156                            msg = str(self.id) + " " + to_send157                            msg = "{:<256}".format(msg)158                            #print(msg)159                            res = self.all_sockets[server].send(msg.encode())160                        self.my_lock.release()161                    if msg_type == "ABORTED":162                        self.my_lock.acquire()163                        self.begin_flag = False164                        self.abort_ok += 1165                        self.my_lock.release()166                        if not self.force_abort:167                            self.message_to_user = "ABORTED"168                    elif msg_type == "COMMIT OK":169                        self.my_lock.acquire()170                        self.begin_flag = False171                        self.commit_ok += 1172                        if self.commit_ok == len(self.all_servers):173                            for server in self.all_servers:174                                msg = str(self.id) + " " + "COMMIT_CONFIRM"175                                msg = "{:<256}".format(msg)176                                #print(msg)177                                res = self.all_sockets[server].send(msg.encode())178                            self.received_last = False179                            self.message_to_user = "COMMIT OK"180                        self.my_lock.release()181                    elif msg_type == "COMMITTED":182                        self.my_lock.acquire()183                        self.begin_flag = False184                        #del self.server_involved[server]185                        self.my_lock.release()186                    elif msg_type == "OK":187                        #print(msg_type)188                        self.my_lock.acquire()189                        self.received_last = True190                        self.message_to_user = "OK"191                        #print("line 201 FOR USER:", self.message_to_user)192                        print(self.message_to_user)193                        self.my_lock.release()194                    elif "BALANCE" in msg_type:195                        content = msg_type.split(":")[1]196                        #print(content)197                        self.my_lock.acquire()198                        self.message_to_user = content199                        self.received_last = True200                        #print("line 213 FOR USER:", self.message_to_user)201                        print(self.message_to_user)202                        self.my_lock.release()203                    if self.commit_ok == len(self.all_servers) or self.abort_ok == len(self.all_servers):204                        self.my_lock.acquire()205                        #print("line 217 FOR USER:", self.message_to_user)206                        print(self.message_to_user)207                        self.received_last = True208                        self.abort_ok = 0209                        self.commit_ok = 0210                        self.my_lock.release()211    def connect_to_all_servers(self):212        for branch in self.all_servers:213            host = self.all_servers[branch][0]214            port = int(self.all_servers[branch][1])215            #print(host, port)216            self.all_sockets[branch] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)217            self.all_sockets[branch].connect((host, port))218    def run(self):219        self.read_config()220        self.connect_to_all_servers()221        get_input = threading.Thread(target=self.handle_user)222        my_rcv = threading.Thread(target=self.get_response)223        my_send = threading.Thread(target=self.check_and_send)224        get_input.start()225        my_rcv.start()226        my_send.start()227def main():228    my_id = sys.argv[1]229    config_file = sys.argv[2]230    new_client = Client(my_id, config_file)231    new_client.run()...client
Source:client  
1#!/usr/bin/python32import sys3import socket4import queue5import threading6class Client:7    def __init__(self, id, configFile):8        self.id = id9        self.config_file = configFile10        self.received_last = True11        self.begin_flag = False12        self.resend_abort = False13        self.message_to_user = None14        self.force_abort = False15        self.my_lock = threading.Lock()16        self.all_servers = {}17        self.all_sockets = {}18        self.server_involved = {}19        self.commit_ok = 020        self.abort_ok = 021        self.output_queue = queue.Queue()22        self.transactions_queue = queue.Queue()23        self.response_buffer = queue.Queue()24        self.current_trans_id = 025        self.current_response_id = -126    def read_config(self):27        file = open(self.config_file, 'r')28        lines = file.readlines()29        for line in lines:30            line = line.strip()31            branch, ip, port = line.split()32            self.all_servers[branch] = [ip, port]33    def check_and_send(self):34        while True:35            if (not self.transactions_queue.empty()) and self.transactions_queue.queue[0] == "ABORT":36                self.my_lock.acquire()37                msg = str(self.id) + " " + self.transactions_queue.get()38                for server in self.all_servers:39                    msg = "{:<256}".format(msg)40                    res = self.all_sockets[server].send(msg.encode())41                self.received_last = False42                self.force_abort = False43                self.my_lock.release()44                continue45            self.my_lock.acquire()46            check_received = self.received_last47            self.my_lock.release()48            if (check_received):49                if not self.transactions_queue.empty() and self.transactions_queue.queue[0] == "BEGIN":50                    self.my_lock.acquire()51                    self.begin_flag = True52                    self.force_abort = False53                    self.transactions_queue.get()54                    self.my_lock.release()55                    print("OK")56                if not self.transactions_queue.empty():57                    trans = self.transactions_queue.get()58                    #print("trans: ", trans)59                    self.my_lock.acquire()60                    check_begin = self.begin_flag61                    self.my_lock.release()62                    #print("begin: ", self.begin_flag)63                    if check_begin:64                        trans_type, *element = trans.strip().split()65                        #print("trans_type: ", trans_type)66                        self.current_trans_id += 167                        if trans_type == "DEPOSIT":68                            server_account, amount = element69                            server = server_account.split(".")[0]70                            account = server_account.split(".")[1]71                            msg = str(self.id) + " " + trans72                            msg = "{:<256}".format(msg)73                            #print("78: ", msg)74                            res = self.all_sockets[server].send(msg.encode())75                            self.my_lock.acquire()76                            self.received_last = False77                            self.my_lock.release()78                        elif trans_type == "WITHDRAW":79                            server_account, amount = element80                            server = server_account.split(".")[0]81                            account = server_account.split(".")[1]82                            msg = str(self.id) + " " + trans83                            msg = "{:<256}".format(msg)84                            #print("90: ", msg)85                            res = self.all_sockets[server].send(msg.encode())86                            self.my_lock.acquire()87                            self.received_last = False88                            self.my_lock.release()89                        elif trans_type == "BALANCE":90                            #print("element: ", element)91                            server = element[0].split(".")[0]92                            account = element[0].split(".")[1]93                            #print("server: ", server)94                            msg = str(self.id) + " " + trans95                            msg = "{:<256}".format(msg)96                            #print("103:", msg)97                            res = self.all_sockets[server].send(msg.encode())98                            self.my_lock.acquire()99                            self.received_last = False100                            self.my_lock.release()101                        elif trans_type == "COMMIT":102                            self.my_lock.acquire()103                            for server in self.all_servers:104                                msg = str(self.id) + " " + trans105                                msg = "{:<256}".format(msg)106                                #print("114: ", msg)107                                res = self.all_sockets[server].send(msg.encode())108                            self.received_last = False109                            self.my_lock.release()110                        elif trans_type == "ABORT":111                            self.my_lock.acquire()112                            for server in self.all_servers:113                                msg = str(self.id) + " " + trans114                                msg = "{:<256}".format(msg)115                                #print("124: ", msg)116                                res = self.all_sockets[server].send(msg.encode())117                            self.received_last = False118                            self.my_lock.release()119    def handle_user(self):120        for line in sys.stdin:121            if len(line.strip()) > 0:122                trans_type, *element = line.strip().split()123                self.transactions_queue.put(line.strip())124    def get_response(self):125        #print("start get response")126        while True:127            for server in self.all_sockets:128                my_socket = self.all_sockets[server]129                try:130                    response = my_socket.recv(256, socket.MSG_DONTWAIT).decode()131                    response = response.strip()132                except BlockingIOError as e:133                    response = None134                if response and len(response) != 0:135                    #print("receive response: ", response, ",", server)136                    msg_type = response.strip()137                    if msg_type == "NOT FOUND, ABORTED":138                        self.message_to_user = "NOT FOUND, ABORTED"139                        self.my_lock.acquire()140                        self.begin_flag = False141                        self.force_abort = True142                        to_send = "ABORT"143                        for server in self.all_servers:144                            msg = str(self.id) + " " + to_send145                            msg = "{:<256}".format(msg)146                            #print(msg)147                            res = self.all_sockets[server].send(msg.encode())148                        self.my_lock.release()149                    elif msg_type == "COMMIT ABORTED":150                        self.message_to_user = "COMMIT ABORTED"151                        self.my_lock.acquire()152                        self.begin_flag = False153                        self.force_abort = True154                        to_send = "ABORT"155                        for server in self.all_servers:156                            msg = str(self.id) + " " + to_send157                            msg = "{:<256}".format(msg)158                            #print(msg)159                            res = self.all_sockets[server].send(msg.encode())160                        self.my_lock.release()161                    if msg_type == "ABORTED":162                        self.my_lock.acquire()163                        self.begin_flag = False164                        self.abort_ok += 1165                        self.my_lock.release()166                        if not self.force_abort:167                            self.message_to_user = "ABORTED"168                    elif msg_type == "COMMIT OK":169                        self.my_lock.acquire()170                        self.begin_flag = False171                        self.commit_ok += 1172                        if self.commit_ok == len(self.all_servers):173                            for server in self.all_servers:174                                msg = str(self.id) + " " + "COMMIT_CONFIRM"175                                msg = "{:<256}".format(msg)176                                #print(msg)177                                res = self.all_sockets[server].send(msg.encode())178                            self.received_last = False179                            self.message_to_user = "COMMIT OK"180                        self.my_lock.release()181                    elif msg_type == "COMMITTED":182                        self.my_lock.acquire()183                        self.begin_flag = False184                        #del self.server_involved[server]185                        self.my_lock.release()186                    elif msg_type == "OK":187                        #print(msg_type)188                        self.my_lock.acquire()189                        self.received_last = True190                        self.message_to_user = "OK"191                        #print("line 201 FOR USER:", self.message_to_user)192                        print(self.message_to_user)193                        self.my_lock.release()194                    elif "BALANCE" in msg_type:195                        content = msg_type.split(":")[1]196                        #print(content)197                        self.my_lock.acquire()198                        self.message_to_user = content199                        self.received_last = True200                        #print("line 213 FOR USER:", self.message_to_user)201                        print(self.message_to_user)202                        self.my_lock.release()203                    if self.commit_ok == len(self.all_servers) or self.abort_ok == len(self.all_servers):204                        self.my_lock.acquire()205                        #print("line 217 FOR USER:", self.message_to_user)206                        print(self.message_to_user)207                        self.received_last = True208                        self.abort_ok = 0209                        self.commit_ok = 0210                        self.my_lock.release()211    def connect_to_all_servers(self):212        for branch in self.all_servers:213            host = self.all_servers[branch][0]214            port = int(self.all_servers[branch][1])215            #print(host, port)216            self.all_sockets[branch] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)217            self.all_sockets[branch].connect((host, port))218    def run(self):219        self.read_config()220        self.connect_to_all_servers()221        get_input = threading.Thread(target=self.handle_user)222        my_rcv = threading.Thread(target=self.get_response)223        my_send = threading.Thread(target=self.check_and_send)224        get_input.start()225        my_rcv.start()226        my_send.start()227def main():228    my_id = sys.argv[1]229    config_file = sys.argv[2]230    new_client = Client(my_id, config_file)231    new_client.run()...tests.py
Source:tests.py  
1# STANDARD LIB2import hashlib3# THIRD PARTY4from django.utils import timezone5# DJANGAE6from djangae.contrib import sleuth7from djangae.test import TestCase8from gcloudc.db.backends.datastore import transaction9from .kinds import LOCK_KINDS10from .lock import (11    Lock,12    LockAcquisitionError,13    lock,14)15from .memcache import MemcacheLock16from .models import DatastoreLock17from .views import cleanup_locks18class DatastoreLocksTestCase(TestCase):19    """ Tests for the implementation of the STRONG kind of lock (DatastoreLock). """20    def _make_lock(self, identifier, **kwargs):21        """ Shorcut for when we need to manually create DatastoreLock objects for tests. """22        identifier_hash = hashlib.md5(identifier.encode()).hexdigest()23        return DatastoreLock.objects.create(identifier_hash=identifier_hash, **kwargs)24    def test_acquire_and_release(self):25        # If we try to acquire the same lock twice then the second one should fail26        lock = Lock.acquire("my_lock")27        self.assertTrue(isinstance(lock, Lock))28        # Now if we try to acquire the same one again before releasing it, we should get None29        lock_again = Lock.acquire("my_lock", wait=False)30        self.assertIsNone(lock_again)31        # Now if we release it we should then be able to acquire it again32        lock.release()33        lock_again = Lock.acquire("my_lock", wait=False)34        self.assertTrue(isinstance(lock_again, Lock))35    def test_context_manager_no_wait(self):36        """ If the lock is already acquired, then our context manager with wait=False should raise37            LockAcquisitionError.38        """39        def do_context():40            with lock('x', wait=False):41                return True42        # With the lock already in use, the context manager should blow up43        my_lock = Lock.acquire('x')44        self.assertRaises(LockAcquisitionError, do_context)45        # And with the lock released the context should be run46        my_lock.release()47        self.assertTrue(do_context())48    def test_context_manager_steal(self):49        """ If the lock is already acquired, but is older than our limit then the context manager50            should steal it.51        """52        def do_context():53            with lock('x', wait=True, steal_after_ms=10):54                return True55        self._make_lock('x', timestamp=timezone.now() - timezone.timedelta(microseconds=2000))56        self.assertTrue(do_context())57    def test_decorator_no_wait(self):58        """ If the lock is already acquired, then our decorator with wait=False should not run the59            function.60        """61        @lock('x', wait=False)62        def do_something():63            return True64        # With the lock already in use, the function should not be run65        my_lock = Lock.acquire('x')66        self.assertIsNone(do_something())67        # And with the lock released the function should run68        my_lock.release()69        self.assertTrue(do_something())70    def test_decorator_steal(self):71        """ If the lock is already acquired, but is older than our limit then the decorator should72            steal it.73        """74        @lock('x', wait=True, steal_after_ms=10)75        def do_something():76            return True77        self._make_lock('x', timestamp=timezone.now() - timezone.timedelta(microseconds=2000))78        self.assertTrue(do_something())79    def test_cleanup_view(self):80        ages_ago = timezone.now() - timezone.timedelta(minutes=15)81        self._make_lock("old_lock", timestamp=ages_ago)82        recent_lock = self._make_lock("recent_lock")83        cleanup_locks(None)84        self.process_task_queues()85        # The old lock should have been deleted but the new one should not86        self.assertCountEqual(DatastoreLock.objects.all(), [recent_lock])87    def test_transaction_errors_are_handled(self):88        with sleuth.detonate(89            'djangae.contrib.locking.models.LockQuerySet.filter', transaction.TransactionFailedError90        ):91            lock = Lock.acquire("my_lock", wait=False)92            self.assertIsNone(lock)93    def test_max_wait_ms(self):94        lock1 = Lock.acquire("my_lock")   # Get the lock95        self.assertTrue(lock1)96        lock2 = Lock.acquire("my_lock", max_wait_ms=100, wait=True, steal_after_ms=10000)  # Wait 100 ms97        # If we stole it, this wouldn't be None98        self.assertIsNone(lock2)99class MemcacheLocksTestCase(TestCase):100    """ Tests for the implementation of the WEAK kind of lock (MemcacheLock). """101    def test_acquire_and_release(self):102        # If we try to acquire the same lock twice then the second one should fail103        lock = Lock.acquire("my_lock", kind=LOCK_KINDS.WEAK)104        self.assertTrue(isinstance(lock, Lock))105        # Now if we try to acquire the same one again before releasing it, we should get None106        lock_again = Lock.acquire("my_lock", kind=LOCK_KINDS.WEAK, wait=False)107        self.assertIsNone(lock_again)108        # Now if we release it we should then be able to acquire it again109        lock.release()110        lock_again = Lock.acquire("my_lock", kind=LOCK_KINDS.WEAK, wait=False)111        self.assertTrue(isinstance(lock_again, Lock))112    def test_context_manager_steal(self):113        """ If the lock is already acquired, but is older than our limit then the context manager114            should steal it.115        """116        def do_context():117            with lock('x', wait=True, steal_after_ms=10, kind=LOCK_KINDS.WEAK):118                return True119        MemcacheLock.acquire('x')  # Acquire the lock120        # If we don't wait, then the lock can't be acquired121        self.assertFalse(MemcacheLock.acquire('x', wait=False))122        self.assertTrue(do_context())  # Should succeed eventually123    def test_context_manager_no_wait(self):124        """ If the lock is already acquired, then our context manager with wait=False should raise125            LockAcquisitionError.126        """127        def do_context():128            with lock('x', wait=False, kind=LOCK_KINDS.WEAK):129                return True130        # With the lock already in use, the context manager should blow up131        my_lock = Lock.acquire('x', kind=LOCK_KINDS.WEAK)132        self.assertRaises(LockAcquisitionError, do_context)133        # And with the lock released the context should be run134        my_lock.release()135        self.assertTrue(do_context())136    def test_decorator_no_wait(self):137        """ If the lock is already acquired, then our decorator with wait=False should not run the138            function.139        """140        @lock('x', wait=False, kind=LOCK_KINDS.WEAK)141        def do_something():142            return True143        # With the lock already in use, the function should not be run144        my_lock = Lock.acquire('x', kind=LOCK_KINDS.WEAK)145        self.assertIsNone(do_something())146        # And with the lock released the function should run147        my_lock.release()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
