Best Python code snippet using tempest_python
paxos_proposers.py
Source:paxos_proposers.py  
1#/usr/bin/env python32import random3import time4import signal5import sys6import json7import socket8import threading9import numpy as np10from typing import List, Any, Dict, Set11from socketserver import TCPServer, BaseRequestHandler, ThreadingMixIn12OKGREEN = '\033[92m'13YELLOW = '\033[93m'14ENDC = '\033[0m'15PORT_ACCEPTOR = 10200            # group port range start, port = PORT + server_id16PORT_PROPOSER = 1021017PORT_REDIRECT = 1022018PORT_FETCH = 1023019PORT_HEARTBEAT = 10240  # group heartbeat port range start20WAITING_TIMEOUT = 3    # time to wait before resending potentially lost msgs21# enums for state machine command sequence22S_UNSURE = 'undefined'23S_NO_OP = 'no-op'24S_ELECT_LEADER_PREFIX = 'S_ELECT_LEADER_'25def S_ELECT_LEADER(leader_id):26    return S_ELECT_LEADER_PREFIX + str(leader_id)27# enums for paxos message types28T_PREP = 'prepare'29T_PREP_R = 'prepare-reply'30T_ACC = 'accept'31T_ACC_R = 'accept-reply'32T_LEARN = 'learn'33# phases34P_PREP = 035P_ACC = 136P_LEARN = 237# waiting status, used in resend logic to handle network outage38W_OK = 039W_WAITING_PREPARE_REPLIES = 140W_WAITING_ACCEPT_REPLIES = 241# TODO: waiting on learn?42def INIT_PROGRESS():43    return {44        'phase': P_PREP,45        'base_n': 0, # proposal number base46        'highest_proposed_v': '',47        'highest_proposed_n': -1, # highest proposal number I've promised48        'prepare': {}, # stores prepare responses49        'accepted_reply': {} # stores accept responses50    }51class HeartBeatRecvHandler(BaseRequestHandler):52    def handle(self):53        data = self.request.recv(1024).strip().decode('utf-8')54        try:55            self.server.recv_heartbeat(data)56        except ValueError:57            self.server.lock_server.log('[HeartbeatRecv] Could not parse the data as JSON: {}'.format(data))58        finally:59            # close the connection because everything is async60            self.request.close()61class HeartBeatRecvServer(TCPServer):62    allow_reuse_address = True63    last_heartbeat = None64    def __init__ (self, sid: int, lock_server):65        addr = ('localhost', PORT_HEARTBEAT + sid)66        self.heartrecvaddr = addr67        self.sid = sid68        self.lock_server = lock_server69        self.leaderHeartBeat = (-1, None)70        TCPServer.__init__(self, addr, HeartBeatRecvHandler)71    72    def recv_heartbeat(self, data):73        infos = data.split('_')74        leader_id = int(infos[1])75        self.lock_server.pastSlotId = int(infos[2])76        # if len(infos) > 2:77        #     for singleInfo in infos[3:]:78        #         if singleInfo[0] == 'a':79        #             self.lock_server.failed_acceptors.add(int(singleInfo[1:]))80        #         if singleInfo[0] == 'p':81        #             self.lock_server.failed_proposers.add(int(singleInfo[1:]))82        # self.lock_server.log("Receive heart beat from Leader %s" % leader_id)83        if self.lock_server.leader == -1 and leader_id >= 0:84            self.lock_server.log(OKGREEN + "Change leader to {}".format(leader_id) + ENDC)85            self.lock_server.leader = leader_id86            self.leaderHeartBeat = (self.lock_server.leader, time.time())87            if self.lock_server.recover is True:88                self.request_decide()            89        elif self.lock_server.leader == leader_id:90            self.leaderHeartBeat = (self.lock_server.leader, time.time())91        elif self.lock_server.leader != leader_id:92            self.lock_server.log(OKGREEN + "Change leader to {}".format(leader_id) + ENDC)93            self.lock_server.leader = leader_id94            self.leaderHeartBeat = (self.lock_server.leader, time.time())95            if self.lock_server.recover is True:96                self.request_decide()97        # self.lock_server.log('Proposer #{} update heartbeat "{}" from Leader #{}'.format(98        #               self.lock_server.server_id, self.leaderHeartBeat, leader_id))99    100    def request_decide(self):101        send_msg = "requestDecide_%d" % (self.lock_server.server_id)102        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)103        addr = ('localhost', PORT_PROPOSER + self.lock_server.leader)104        sock.settimeout(0.5)105        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)106        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)107        try:108            sock.connect(addr)109            sock.sendall(send_msg.encode('utf-8'))110        except ConnectionRefusedError:111            self.lock_server.log("Recover {} recover fail, please kill the process".format(self.lock_server.server_id))112            pass113        except socket.timeout as e:114            pass115        finally:116            sock.close()117class HeartBeatRecvThread(threading.Thread):118    def __init__(self, sid: int, lock_server):119        threading.Thread.__init__(self)120        self.server = HeartBeatRecvServer(sid, lock_server)121        122    def run(self):123        self.server.lock_server.log("Proposor %d start to run heartbeat receiver at PORT %s" % (self.server.sid, self.server.heartrecvaddr))124        self.server.serve_forever()125class HeartBeatCheckerThread(threading.Thread):126    def __init__(self, heartbeat_server, lock_server):127        threading.Thread.__init__(self)128        self.heartbeat_server = heartbeat_server129        self.lock_server = lock_server130        self.stopped = False131    def run(self):132        while not self.stopped:133            if self.lock_server.is_leader() is False:134                # self.lock_server.log('heartbeat information {}'.format(self.heartbeat_server.leaderHeartBeat))135                if self.heartbeat_server.leaderHeartBeat[0] >= 0 and self.heartbeat_server.leaderHeartBeat[1] is not None:136                    wait_time = (self.lock_server.server_id) * 1 + 1137                    if float(self.heartbeat_server.leaderHeartBeat[1]) + wait_time < time.time():138                        self.lock_server.log('heartbeat check failed, trigger election')139                        print("TimeOut = ", time.time() - self.heartbeat_server.leaderHeartBeat[1] + wait_time)140                        self.lock_server.failed_proposers.add(self.lock_server.leader)141                        self.lock_server.leader = -1142                        self.lock_server.electLeader(increaseProposalNum=True)143                        while self.lock_server.leader == -1:144                            time.sleep(self.lock_server.abortWaitTime)145                            self.lock_server.electLeader(increaseProposalNum=True)146                        # break147                    else:148                        self.abortWaitTime = 1 + self.lock_server.server_id * 0.2149            else:150                self.abortWaitTime = 1 + self.lock_server.server_id * 0.2151            time.sleep(1.0)152class HeartBeatSenderThread(threading.Thread):153    def __init__(self, lock_server):154        threading.Thread.__init__(self)155        self.lock_server = lock_server156        self.stopped = False157    def run(self):158        time.sleep(0.5)159        self.lock_server.log("Proposor %d start to serves heartbeat service" % (self.lock_server.server_id))160        while not self.stopped:161            self.lock_server.send_heartbeat()162            if self.lock_server.recover is True and self.lock_server.leader >= 0:163                data = self.send_requestRecover()164                if data.split('_')[-1].lower() == "end":165                    self.lock_server.log("Recover {}, recover success!".format(self.lock_server.server_id))166                    self.lock_server.recover = False167                else:168                    Cliend_Id, CmdId, SlotId, Cmd = [x for x in data.split('_')[1:]]169                    Cliend_Id = int(Cliend_Id)170                    CmdId = int(CmdId)171                    SlotId = int(SlotId)172                    self.lock_server.pastCommands[(Cliend_Id, CmdId)] = (SlotId, Cmd)173            time.sleep(0.5)174    def send_requestRecover(self):175        send_msg = 'requestRecover_{}'.format(self.lock_server.server_id)176        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)177        addr = ('localhost', PORT_PROPOSER + self.lock_server.leader)178        self.lock_server.log('Recover {} sending "{}" to Leader {}'.format(179          self.lock_server.server_id, send_msg, self.lock_server.leader))180        try:181            sock.connect(addr)182            sock.sendall(send_msg.encode('utf-8'))183            try:184                rtMessage = sock.recv(1024).decode('utf-8')185            except ConnectionResetError as e:186                self.lock_server.log("Recover {} recover fail, please kill the process".format(self.lock_server.server_id))187                raise e188            except socket.timeout as e:189                print("Recover node connect to leader Node {} is Time Out!, please kill the process".format(self.lock_server.server_id))190                raise e191        except ConnectionRefusedError as e:192            self.lock_server.log("Recover {} recover fail, please kill the process".format(self.lock_server.server_id))193            raise e194        finally:195            sock.close()196            return rtMessage197class FetchRecvHandler(BaseRequestHandler):198    def handle(self):199        data = self.request.recv(1024).strip().decode('utf-8')200        infos = data.split('_')201        clientId = int(infos[1])202        lockId = int(infos[2])203        rtMessage = self.server.fetchData(clientId, lockId)204        try:205            self.server.lock_server.log('Sendding result "{}" to client {}'.format(rtMessage, clientId))206            self.request.sendall(rtMessage.encode('utf-8'))207        except ValueError:208            self.server.lock_server.log('[FetchRecv] Could not understand the fetch')209        finally:210            # close the connection because everything is async211            self.request.close()212class FetchRecvServer(TCPServer):213    allow_reuse_address = True214    last_heartbeat = None215    def __init__ (self, sid: int, lock_server):216        addr = ('localhost', PORT_FETCH + sid)217        self.sid = sid218        self.lock_server = lock_server219        TCPServer.__init__(self, addr, FetchRecvHandler)220    221    def fetchData(self, clientId, lockId):222        self.lock_server.log('Client {} fetching data "{},lock No.{}" from Proposor {}'.format(clientId, clientId, lockId, self.lock_server.server_id))223        pastCmdList = []224        printStr = '{'225        for k, v in self.lock_server.pastCommands.items():226            printStr += '(Client ID: %d,  Lamport TimeStamp: %d, Request: %s)' % (k[0], k[1], v[1])227        printStr += '}' 228        self.lock_server.log('Client %d check status of lock %d' % (clientId, lockId))229        self.lock_server.log(OKGREEN + 'Current Processed Sequence Request: {}'.format(printStr) + ENDC)230        for k, v in self.lock_server.pastCommands.items():231            pastCmdList.append((k[0], k[1], v[0], v[1])) # (Cliend Id, CmdId, SlotId, Cmd)232        pastCmdList.sort(key = lambda x: (x[2], x[0], x[1])) # (SlotId, Cliend Id, CmdId)233        self.lock_server.log('Print Past Command = {}'.format(pastCmdList))234        self.lock_server.locks = [None for _ in  self.lock_server.locks]235        for cmd in pastCmdList:236            self.execute(cmd[0], cmd[1], cmd[3])237        if self.lock_server.locks[lockId] == None:238            rtMessage = "Lock %d is free." % lockId239        else:240            rtMessage = "Lock %d is owned by %d." % (lockId, self.lock_server.locks[lockId])241        # elif self.lock_server.locks[lockId] != clientId:242        #     rtMessage = "Lock %d is owned by %d." % (lockId, self.lock_server.locks[lockId])243        # elif self.lock_server.locks[lockId] == clientId:244        #     rtMessage = "Lock %d belongs to you." % lockId245        self.lock_server.locks = [None for _ in  self.lock_server.locks]246        # self.lock_server.log('Data "{},lock No.{}" Result: "{}"'.format(clientId, lockId, rtMessage))247        248        return rtMessage249        250    def execute(self, clientId: int, cmdId: int, command: str):251        if "l" == command[0]:252            lockNode = int(command.split('-')[-1])253            if self.lock_server.locks[lockNode] == None:254                print("Lock %d is locked by %d" % (lockNode, clientId))255                self.lock_server.locks[lockNode] = clientId256            elif self.lock_server.locks[lockNode] == clientId:257                print("Lock %d is already locked by %d" % (lockNode, clientId))258                pass259            elif self.lock_server.locks[lockNode] != clientId:260                print("Lock %d cannot be locked by %d" % (lockNode, clientId))261                pass262        elif "u" == command[0]:263            unlockNode = int(command.split('-')[-1])264            if self.lock_server.locks[unlockNode] == None:265                print("Lock %d is already freed" % unlockNode)266                pass267            elif self.lock_server.locks[unlockNode] == clientId:268                self.lock_server.locks[unlockNode] = None269                print("Lock %d is unlocked by %d" % (unlockNode, clientId))270            elif self.lock_server.locks[unlockNode] != clientId:271                print("Lock %d cannot be unlocked by %d" % (unlockNode, clientId))272                pass273class FetchRecvThread(threading.Thread):274    def __init__(self, sid: int, lock_server):275        threading.Thread.__init__(self)276        self.server = FetchRecvServer(sid, lock_server)277        278    def run(self):279        self.server.lock_server.log("Proposor %d start to serves fetch service" % (self.server.sid))280        self.server.serve_forever()281class RedirectSenderThread(threading.Thread):282    def __init__(self, lock_server):283        threading.Thread.__init__(self)284        self.lock_server = lock_server285        self.stopped = False286        287    def run(self):288        time.sleep(1.0)289        self.lock_server.log("Proposor %d start to serves redirect service" % (self.lock_server.server_id))290        while not self.stopped:291            self.lock_server.redirect()292            time.sleep(1.0)293class LockHandler(BaseRequestHandler):294    '''295    Override the handle method to handle each request296    '''297    def handle(self):298        self.server.log("acquiring...")299        # with self.server.handler_lock:300        data = self.request.recv(1024).strip().decode('utf-8')301        self.server.log('Received message {}'.format(data), verbose=True)302        try:303            # data = json.loads(data)304            infos = data.split('_')305            self.server.log('Infos = {}'.format(infos), verbose=True)306            if infos[0] == 'client':307                self.server.processClient(infos)308                reply_msg = \309                    'Request (Client ID: %d,  Lamport TimeStamp: %d, Request: %s) has been processed' % \310                        (int(infos[1]), int(infos[2]), infos[3])311                self.request.sendall(reply_msg.encode('utf-8'))312            elif infos[0] == 'promise':313                self.server.processPromise(infos)314            elif infos[0] == 'accepted':315                self.server.processAccepted(infos)316            elif infos[0] == 'decide':317                reply_msg = self.server.processDecide(infos)318                self.request.sendall(reply_msg.encode('utf-8'))319            elif infos[0] == 'leader':320                reply_msg = self.server.processLeader(infos)321                self.request.sendall(reply_msg.encode('utf-8'))322            elif infos[0] == 'requestDecide':323                self.server.generateRecoverCommands(int(infos[1]))324                self.server.sendDecideProposers.add(int(infos[1]))325            elif infos[0] == 'requestRecover':326                reply_msg = self.server.processRecoverCommands(int(infos[1]))327                self.request.settimeout(1)328                try:329                    self.request.sendall(reply_msg.encode('utf-8'))330                except ConnectionRefusedError:331                    print("Connection to recover Node {} is Refused!".format(int(infos[1])))332                except socket.timeout as e:333                    print("Connection to recover Node {} is Time Out!".format(int(infos[1])))334                finally:335                    pass336                self.request.settimeout(None)337                if reply_msg.split('_')[-1].lower() == 'end':338                    self.server.sendDecideProposers.discard(int(infos[1]))339            elif infos[0] == 'recover':340                self.server.failed_proposers.discard(int(infos[1]))341                self.server.failed_acceptors.discard(int(infos[1]))342            # else:343            #     self.server.receive_msg(self.request, data)344        except ValueError:345            self.server.log('Could not parse the data as String: {}'.format(data))346        finally:347            # close the connection because everything is async348            self.request.close()349        self.server.log("LockHandler.handle done")350class LockServer(TCPServer):351    # whether the server will allow the reuse of an address352    allow_reuse_address = True353    leader = -1 # initial leader354    is_electing = False # if this node is during the election period355    # state machine356    next_exe = 0 # the next executable command index357    sendDecideProposers = set()358    # an array of command sequence359    # the i-th element corresponds to the result of the i-th paxos instance360    # undecided commands are marked by S_UNSURE361    states = [] # type: List[str]362    pastCommands = {} # Key = (clientId, cmdId), Value = (SlotId, Command, Result)363    recoverCommands = {} # Key = RecoverId, Value = (Cliend Id, CmdId, SlotId, Cmd)364    futureCommands = {} # Key = (clientId, cmdId), Value = Command Message365    locks = [None for _ in range(11)] # locked by client n or None366    proposerNumber = None367    msg_count = 0368    slotId = 0369    pastSlotId = -1370    # stores per instance information371    # leader: prepare responses, accept responses, proposal number372    # follower: highest promised proposal number, highest accept373    progress = {} # type: Dict[int, Any]374    failed_proposers = set() # type: Set[int]375    failed_acceptors = set()376    377    handler_lock = threading.Lock()378    waiting = W_OK379    waiting_settime = 0380    wait_args = None381    # constructor382    def __init__ (self, sid: int, total: int, recover=False):383        self.server_id = sid384        self.total_nodes = total385        self.proposal_Number = total - sid386        self.abortWaitTime = 1 + self.server_id * 0.2387        self.recover = recover388        addr = ('localhost', PORT_PROPOSER + sid)389        TCPServer.__init__(self, addr, LockHandler)390        self.waitTime = 1 + sid * 0.2391        if recover is False:392            while(self.leader == -1 and self.server_id == 0):393                time.sleep(self.abortWaitTime)394                self.electLeader()395    def generateRecoverCommands(self, recoverId):396        if len(self.recoverCommands.get(recoverId, [])) == 0:397            self.log("Generate Recover Command for Recover Node %d" % recoverId)398            self.recoverCommands[recoverId] = []399            for xx in self.pastCommands.items(): # ((Cliend Id, CmdId), (SlotId, Cmd))400                print("Recover Command Generated = ", xx)401                self.recoverCommands[recoverId].append([str(xx[0][0]), str(xx[0][1]), str(xx[1][0]), str(xx[1][1])])402        403    # TODO: Add this back, send heartbeat to followers404    def send_heartbeat(self):405        if self.server_id == self.leader:406            msg = self.server_id407            send_msg = "heart_%d_%d" % (self.server_id, self.slotId)408            for i in range(self.total_nodes):409                if not i == self.server_id:410                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)411                    addr = ('localhost', PORT_HEARTBEAT + i)412                    # self.log('Leader #{} sending "{}" to Proposor #{}'.format(413                    #   self.server_id, send_msg, i))414                    try:415                        sock.connect(addr)416                        sock.sendall(send_msg.encode('utf-8'))417                    except ConnectionRefusedError:418                        pass419                        # print("Connection to proposer {} is Refused!".format(i))420                        # if i not in self.failed_proposers:                        421                        #     self.failed_proposers.add(i)422                    finally:423                        sock.close()424    # TODO: Add this back, send heartbeat to followers425    def processRecoverCommands(self, recoverId):426        # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)427        # sock.settimeout(0.5)428        # sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)429        # sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)430        # addr = ('localhost', PORT_PROPOSER + recoverId)431        if len(self.recoverCommands[recoverId]) > 0:432            send_msg = "recoverCommand_%s" % '_'.join(self.recoverCommands[recoverId].pop())433        else:434            send_msg = "recoverCommand_end"435        # try:436        #     sock.connect(addr)437        #     sock.sendall(send_msg.encode('utf-8'))438        # except ConnectionRefusedError:439        #     print("Connection to recover Node {} is Refused!".format(recoverId))440        #     send_msg = 'fail'441        # except socket.timeout as e:442        #     print("Connection to recover Node {} is Time Out!".format(recoverId))443        #     send_msg = 'fail'444        # finally:445        #     sock.close()446        return send_msg447    # TODO: Add this back, send message to leaders448    def redirect(self):449        removeCmds = []450        for k in self.futureCommands.keys():451            if self.pastCommands.get(k) is not None:452                removeCmds.append(k)453        for rmk in removeCmds:454            self.futureCommands.pop(rmk)455        for k in self.futureCommands.keys():456            if self.pastCommands.get(k) is None:457                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)458                sock.settimeout(1.0)459                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)460                sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)461                other_node = random.choice([xx for xx in range(self.total_nodes) if xx not in self.failed_proposers and xx != self.leader])462                if self.leader == self.server_id or self.leader == -1:463                    addr = ('localhost', PORT_PROPOSER + other_node)464                    self.log('Leader {} redirect "{}" to Proposer {}'.format(465                        self.server_id, self.futureCommands[k], other_node))                    466                else:467                    addr = ('localhost', PORT_PROPOSER + self.leader)468                    self.log('Proposer {} redirect "{}" to Leader {}'.format(469                        self.server_id, self.futureCommands[k], self.leader))470                try:471                    sock.connect(addr)472                    sock.sendall(self.futureCommands[k].encode('utf-8'))473                except ConnectionRefusedError:474                    pass475                    # print("Connection Refused!!!")476                except socket.timeout as e:477                    self.log("socket timeout {}\nRedirect from Proposer{}: to Leader: {}".format(e, self.server_id, self.leader))478                finally:479                    sock.close()480                    break481    # TODO: Add this back, send heartbeat to followers482    def recv_heartbeat(self, data):483        leader_id = int(data.split('_')[1])484        timestamp = float(data.split('_')[2])485        486        if self.leader == -1 and leader_id >= 0:487            self.leader = leader_id488            self.leaderHeartBeat = (self.leader, time.time())489        elif self.leader == leader_id:490            self.leaderHeartBeat = (self.leader, time.time())491        elif self.leader != leader_id:492            self.leader = leader_id493            self.leaderHeartBeat = (self.leader, time.time())494        # self.log('Proposer #{} Receiving "{}" from Leader #{}'.format(495        #               self.server_id, data, self.leader))496        return497    def log (self, msg, verbose=False):498        if (not VERBOSE) and verbose:499            return500        print('[S{}] {}'.format(self.server_id, msg))501    def is_leader (self):502        return self.leader == self.server_id503    def electLeader(self, increaseProposalNum=False):504        self.abortWaitTime *= 1.5505        if increaseProposalNum is True:506            self.proposal_Number += self.total_nodes507        send_msg = 'prepare_%d_%d' % (self.proposal_Number, self.server_id)508        tmpRecvs = []509        if self.leader != -1: return510        for i in range(self.total_nodes):511            if i in self.failed_acceptors:512                continue513            tmpRecvs.append(None)514            tmpRecvs[-1] = self.send_prepare(defaultPORT=PORT_ACCEPTOR, acceptorId=i, clientId=-1, cmdId=-1, payload=send_msg)515            if "abort" in tmpRecvs[-1]:516                return517        validRecvs = [(int(xx.split('_')[-1]), xx) for xx in tmpRecvs if "promise" in xx]518        if len(validRecvs) >= (self.total_nodes // 2) + 1:519            recvNoVal = {}520            for _, recv in validRecvs:521                recvNo = int(recv.split('_')[1])522                recvVal = int(recv.split('_')[2])523                if recvVal >= 0:524                    recvNoVal[recvNo] = recvVal525            if len(recvNoVal.keys()) > 0:526                self.log("Receive Accepted Value = {}".format(int(recvNoVal[sorted(recvNoVal.keys())[-1]])))527                self.log("FAILED_PROPOSERS = {}".format(self.failed_proposers))528            if len(recvNoVal.keys()) == 0:529                myVal = self.server_id530            elif int(recvNoVal[sorted(recvNoVal.keys())[-1]]) in self.failed_proposers:531                myVal = self.server_id532            else:533                myVal = recvNoVal[sorted(recvNoVal.keys())[-1]]534            send_msg = 'accept_' + str(self.proposal_Number) + '_' + str(myVal) + '_' + str(self.server_id)535            self.log("Sendding Accept Message = {}".format(send_msg))536            tmpRecvs = []537            validAccepted = 0538            for accId, _ in validRecvs:                    539                if i in self.failed_acceptors:540                    continue541                tmpRecvs.append(None)542                tmpRecvs[-1] = self.send_elect_accept(defaultPORT=PORT_ACCEPTOR, acceptorId=accId, payload=send_msg)543                if 'accepted' in tmpRecvs[-1]:544                    validAccepted += 1545                # if validAccepted >= self.total_nodes // 2 + 1:546                #     break547            if validAccepted >= self.total_nodes // 2 + 1:548                append_msg = ''549                if len(self.failed_proposers) > 0:550                    append_msg += '_' + ('_').join([str(x) for x in self.failed_proposers])551                send_msg = 'leader_' + str(myVal) + append_msg552                for i in range(self.total_nodes):553                    if i in self.failed_proposers or i == self.server_id:554                        continue555                    reply_msg = self.send_elect_decide(defaultPORT=PORT_PROPOSER, proposerId=i, payload=send_msg, toWhom='proposer')556                    if "leader" not in reply_msg or int(reply_msg.split('_')[1]) != myVal:557                        break558                self.leader = myVal559                self.slotId = self.pastSlotId + 1560                return561        self.proposal_Number = self.proposal_Number + self.total_nodes562    def processLeader(self, infos):563        leaderId = int(infos[1])564        if len(infos) > 2:565            for xx in infos[2:]:566                self.failed_proposers.add(int(xx))567        self.leader = leaderId568        self.log(OKGREEN + "Change leader to {}".format(self.leader) + ENDC)569        # self.log("Proposer%d's choose %d as leader" % (self.server_id, self.leader))570        571        reply_msg = 'leader_' + str(self.leader)572        573        return reply_msg574    # send prepare messages to all nodes575    def send_prepare(self, defaultPORT: int, acceptorId: int, clientId: int, cmdId: int, payload):576        if random.random() > 1.00:577            self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, acceptorId) + "^" * 80)578            return "TimeOut"579        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)580        sock.settimeout(1)581        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)582        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)583        addr = ('localhost', defaultPORT + acceptorId)584        try:585            sock.connect(addr)586            sock.sendall((payload).encode('utf-8'))587            self.log('Proposer #{} Sending prepare_{} to Acceptor #{}'.format(588                      self.server_id, payload.split('_')[1], acceptorId))589            try:590                rtMessage = sock.recv(1024).decode('utf-8')591            except ConnectionResetError as e:592                self.log(">>>>>>\nacceptorId: {}\npayload: {}\n<<<<<<<".format(acceptorId, payload))593                raise e594        except ConnectionRefusedError:595            self.log("Acceptor #{} is down, removing it".format(acceptorId))596            self.failed_acceptors.add(acceptorId)597            rtMessage = "NoAcceptor"598        except socket.timeout as e:599            self.log("socket timeout {}\nAcceptor_id: {}\nCommandInfo: ({}, {})".format(e, acceptorId, clientId, cmdId))600            601        finally:602            sock.close()603            return rtMessage604            605    # send decide messages to other proposers(replicas)606    def send_elect_decide(self, defaultPORT: int, proposerId: int, payload, toWhom):607        if random.random() > 1.00:608            self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, proposerId) + "^" * 80)609            time.sleep(1)610            return "TimeOut"611        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)612        sock.settimeout(1)613        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)614        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)615        addr = ('localhost', defaultPORT + proposerId)616        try:617            sock.connect(addr)618            sock.sendall((payload).encode('utf-8'))619            self.log('Proposer #{} Sending decide Leader #{} to {} #{}'.format(620                      self.server_id, self.server_id, toWhom, proposerId))621            try:622                return sock.recv(1024).decode('utf-8')623            except ConnectionResetError as e:624                self.log(">>>>>>\n{}}Id: {}\npayload: {}\n<<<<<<<".format(toWhom, proposerId, payload))625                raise e626        except ConnectionRefusedError:627            self.log("{} #{} is down, removing it".format(toWhom, proposerId))628            if toWhom.lower() == "proposer":629                self.failed_proposers.add(proposerId)630                return "NoProposer"631            elif toWhom.lower() == "acceptor":632                self.failed_acceptors.add(proposerId)633                return "Acceptor"634        except socket.timeout as e:635            self.log("socket timeout {}\n To {}_id #{} Elect Leader #{}".format(e, toWhom, proposerId, self.server_id))636            return "TimeOut"637        finally:638            sock.close()639    def send_elect_accept(self, defaultPORT: int, acceptorId: int, payload):640        if random.random() > 1.00:641            self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, acceptorId) + "^" * 80)642            return "TimeOut"643        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)644        sock.settimeout(1)645        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)646        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)647        addr = ('localhost', defaultPORT + acceptorId)648        try:649            sock.connect(addr)650            sock.sendall((payload).encode('utf-8'))651            self.log('Proposer #{} Sending elect accept to Acceptor #{}'.format(652                      self.server_id, acceptorId))653            try:654                return sock.recv(1024).decode('utf-8')655            except ConnectionResetError as e:656                self.log(">>>>>>\nacceptorId: {}\npayload: {}\n<<<<<<<".format(acceptorId, payload))657                raise e658        except ConnectionRefusedError:659            self.log("Acceptor #{} is down, removing it".format(acceptorId))660            self.failed_acceptors.add(acceptorId)661            return "NoAcceptor"662        except socket.timeout as e:663            self.log("socket timeout {}\nAcceptor_id: {}\nCommandInfo: ({}, {})".format(e, acceptorId, clientId, cmdId))664            return "TimeOut"665        finally:666            sock.close()667    def send_command_accept(self, defaultPORT: int, acceptorId: int, clientId: int, cmdId: int, payload):668        if random.random() > 1.00:669            self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, acceptorId) + "^" * 80)670            return "TimeOut"671        rtMessage = ""672        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)673        sock.settimeout(1)674        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)675        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)676        addr = ('localhost', defaultPORT + acceptorId)677        try:678            sock.connect(addr)679            sock.sendall((payload).encode('utf-8'))680            self.log('Proposer #{} Sending command "{}" to Acceptor #{}'.format(681                      self.server_id, payload, acceptorId))682            try:683                rtMessage = sock.recv(1024).decode('utf-8')684            except ConnectionResetError as e:685                self.log(">>>>>>\nacceptorId: {}\npayload: {}\n<<<<<<<".format(acceptorId, payload))686                rtMessage = "Failed"687                raise e688        except ConnectionRefusedError:689            self.log("Acceptor #{} is down, removing it".format(acceptorId))690            self.failed_acceptors.add(acceptorId)691            rtMessage = "Failed"692        except socket.timeout as e:693            self.log("socket timeout {}\nAcceptor_id: {}\nCommandInfo: ({}, {})".format(e, acceptorId, clientId, cmdId))694            rtMessage = "TimeOut"695        finally:696            sock.close()697            return rtMessage698    def send_command_decide(self, defaultPORT: int, proposerId: int, clientId: int, cmdId: int, payload):699        if random.random() > 1.00:700            self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, proposerId) + "^" * 80)701            time.sleep(1)702            return "TimeOut"703        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)704        sock.settimeout(1)705        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)706        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)707        addr = ('localhost', defaultPORT + proposerId)708        try:709            sock.connect(addr)710            sock.sendall((payload).encode('utf-8'))711            self.log('SlotID: #{}, Proposer #{} Sending decide command {} to Proposer #{}'.format(712                      self.slotId, self.server_id, payload, proposerId))713            try:714                rtMessage = sock.recv(1024).decode('utf-8')715            except ConnectionResetError as e:716                self.log(">>>>>>\nproposerId: {}\npayload: {}\n<<<<<<<".format(proposerId, payload))717                raise e718        except ConnectionRefusedError:719            self.log("Proposer #{} is down, removing it".format(proposerId))720            self.failed_proposers.add(proposerId)721            rtMessage = 'No Proposer'722        except socket.timeout as e:723            self.log("socket timeout {}\nProposer_id: {}\nCommandInfo: ({}, {})".format(e, proposerId, clientId, cmdId))724            rtMessage = 'No Proposer'725        finally:726            sock.close()727            return rtMessage728    def processDecide(self, infos):729        clientId = int(infos[1])730        cmdId = int(infos[2])731        cmdContent = infos[3]732        slotId = int(infos[4])733        # assert slotId == self.slotId, "Proposer%d's Slot ID %d does not equal to Recv Slot ID %d" % (slotId, slotId, self.slotId)734        if self.pastCommands.get((clientId, cmdId), False) is False:735            self.slotId = slotId736            self.pastCommands[(clientId, cmdId)] = (slotId, cmdContent)737        reply_msg = 'stored_' + str(clientId) + '_' + str(cmdId) + '_' + cmdContent738        739        return reply_msg740    # handle a client connection741    def processClient(self, infos):742        clientId = infos[1]743        cmdId = infos[2]744        cmdContent = infos[3]745        746        if self.pastCommands.get((clientId, cmdId), False) is not False:747            self.log("Command ({}, {}, {}) is already executed".format(clientId, cmdId, cmdContent))748            return749        elif self.is_leader() is False:750            self.log("I am not leader, store Command ({}, {}, {}) in futureCommands".format(clientId, cmdId, cmdContent))751            self.futureCommands[(int(clientId), int(cmdId))] = ('_').join(infos)752            return753        else:754            self.log("I am leader, store Command ({}, {}, {}) in futureCommands".format(clientId, cmdId, cmdContent))755            self.futureCommands[(int(clientId), int(cmdId))] = ('_').join(infos)756            self.log("I am leader, send Command ({}, {}, {}) to Acceptors".format(clientId, cmdId, cmdContent))757            send_msg = 'accept_' + str(self.proposal_Number) + '_' + cmdContent + '_' + str(self.server_id)758            tmpRecvs = []759            for acceptorId in range(self.total_nodes):760                if acceptorId not in self.failed_acceptors:761                    tmpRecvs.append(None)762                    tmpRecvs[-1] = self.send_command_accept(763                        defaultPORT=PORT_ACCEPTOR, 764                        acceptorId=acceptorId, 765                        clientId=clientId, 766                        cmdId=cmdId, 767                        payload=send_msg,768                    )769                    self.log("Receive accepted message {} from Acceptor {}".format(tmpRecvs[-1], acceptorId))770            NumAccepted = len([xx for xx in tmpRecvs if isinstance(xx, str) and 'accepted' in xx])771            if NumAccepted < (self.total_nodes // 2) + 1:772                773                self.log("Reply from acceptor is not enough({}), store Command ({}, {}, {}) to futureCommands".format(NumAccepted, clientId, cmdId, cmdContent))774                self.futureCommands[(int(clientId), int(cmdId))] = ('_').join(infos)775                # reply_msg = 'resend_' + str(clientId) + '_' + str(cmdId) + '_' + str(cmdContent) + '_' + str(self.server_id)776            else:777                decide_msg = 'decide_' + str(clientId) + '_' + str(cmdId) + '_' + str(cmdContent) + '_' + str(self.slotId)778                tmpRecvs = []779                self.log("I am leader, send Decide of Command ({}, {}, {}) to Proposers".format(clientId, cmdId, cmdContent))780                for proposerId in range(self.total_nodes):781                    if proposerId != self.server_id and proposerId not in self.failed_proposers:    782                        tmpRecvs.append(None)783                        tmpRecvs[-1] = self.send_command_decide(784                            defaultPORT=PORT_PROPOSER, 785                            proposerId=proposerId, 786                            clientId=clientId, 787                            cmdId=cmdId, 788                            payload=decide_msg,789                        )790                self.processDecide(decide_msg.split('_'))791                for proposerId in self.sendDecideProposers:792                    self.send_command_decide(793                        defaultPORT=PORT_PROPOSER, 794                        proposerId=proposerId, 795                        clientId=clientId, 796                        cmdId=cmdId, 797                        payload=decide_msg,798                    )799                self.slotId += 1800                # NumExecuted = len([xx for xx in tmpRecvs if isinstance(xx, str) and 'stored' in xx])801                # if NumExecuted == (self.total_nodes // 2):802                #     self.futureCommands[(int(clientId), int(cmdId))] = infos.joins('_')803                # else:804                #     if self.pastCommands.get((clientId, cmdId), False) is False:805                #         pastCommandsPair = self.execute(clientId, cmdId, cmdContent)806                #         self.slotId += 1807                #         self.pastCommands[(clientId, cmdId)] = pastCommandsPair808                #     reply_msg = 'proceeded_' + str(clientId) + '_' + str(cmdId) + '_' + self.pastCommands[(clientId, cmdId)][2]809                #     return reply_msg810    def log_progress(self):811        print("*" * 80 + "\n" + self.progress_str() + "\n" + "*" * 80)812    # when receiving a prepare reply813    # def on_prepare_reply (self, reply):814    #     if not self.is_leader() and not self.is_electing:815    #         return # only leader follows up with accept816    #     idx = reply['instance']817    #     # stores info into progress818    #     pg_ref = self.progress[idx]819    #     prep_ref = pg_ref['prepare']820    #     if reply['server_id'] in prep_ref:821    #         assert prep_ref[reply['server_id']] == reply, \822    #                 '{} != {}'.format(prep_ref[reply['server_id']], reply)823    #     prep_ref[reply['server_id']] = reply824    #     # do we receive a majority of replies?825    #     if len(prep_ref) > self.total_nodes / 2:826    #         # find out the highest numbered proposal827    #         hn = -1828    #         hv = ''829    #         for sid, replied_reply in prep_ref.items():830    #             if replied_reply['prep_n'] > hn:831    #                 hn = replied_reply['prep_n']832    #                 hv = replied_reply['prep_value']833    #         assert hn >= 0834    #         reply['command'] = hv835    #         self.send_accept(reply)836        837    838                    839    # send learn messages to all nodes840    # def send_learn (self, data, cmd):841    #     payload = self.init_payload(data)842    #     payload['type'] = T_LEARN843    #     payload['command'] = cmd844    #     replies = self.send_all(payload)845    #     reply = self.check_stale(replies)846    #     if reply is not None:847    #         self.on_learn(reply)848    #         return849    #     payload['server_id'] = self.server_id850    #     self.on_learn(payload)851    # when a learn message is received852    # def on_learn (self, request):853    #     idx = request['instance']854    #     cmd = request['command']855    #     # update states856    #     self.init_states_to(idx)857    #     self.states[idx] = cmd858    #     # update progress859    #     self.init_progress(idx)860    #     self.progress[idx]['command'] = cmd861    #     if 'client' in request:862    #         self.progress[idx]['client'] = request['client']863    #     # execute if possible864    #     self.execute()865    #     # debug866    #     self.log(self.states, verbose=True)867    #     self.log(self.progress, verbose=True)868    # execute the commands869    # def execute(self):870        871    #     for i in range(self.next_exe, len(self.states)):872    #         cmd = self.states[i]873    #         if cmd == S_UNSURE:874    #             break # a hole in the sequence, we can't execute further875    #         self.next_exe += 1876    #         if cmd.startswith(S_ELECT_LEADER_PREFIX):877    #             new_leader_id = int(cmd[len(S_ELECT_LEADER_PREFIX):])878    #             self.log("resetting leader to {}".format(new_leader_id))879    #             self.leader = new_leader_id880    #         elif cmd.startswith("lock_"):881    #             n_lock = int(cmd.lstrip("lock_"))882    #             assert n_lock >= 0, str(n_lock)883    #             assert n_lock < len(self.locks), str(n_lock)884    #             client = self.progress[i]['client']885    #             if self.locks[n_lock] is None:886    #                 self.locks[n_lock] = client887    #         elif cmd.startswith("unlock_"):888    #             n_lock = int(cmd.lstrip("unlock_"))889    #             assert n_lock >= 0, str(n_lock)890    #             assert n_lock < len(self.locks), str(n_lock)891    #             client = self.progress[i]['client']892    #             if self.locks[n_lock] == client:893    #                 self.locks[n_lock] = None894    #         else:895    #             raise Exception("unknown command: " + cmd)896    # insert initial instance if it doesn't exist before897    # def init_progress(self, idx: int):898    #     if not idx in self.progress:899    #         self.progress[idx] = INIT_PROGRESS()900            901    # insert values into states up to idx902    # def init_states_to(self, idx):903    #     end = len(self.states)904    #     if len(self.states) > idx:905    #         return # the idx already exists906    #     for i in range(end, idx + 1):907    #         self.states.append(S_UNSURE) # fill holes with S_UNSURE908    # copy re-usable fields into a new payload object909    # def init_payload (self, payload):910    #     stripped = {911    #         'instance': payload['instance']912    #     }913    #     if 'client' in payload:914    #         stripped['client'] = payload['client']915    #     return stripped916    # send a message to peer and immediately close connection917    # def send_msg(self, defaultPORT: int, target_id: int, payload, recv=True):918    #     if random.random() > 0.95:919    #         self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, target_id) + "^" * 80)920    #         return # randomly drop message921    #     # self.msg_count += 1922    #     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)923    #     if target_id == self.leader:924    #         sock.settimeout((self.server_id + random.random()) * 3)925    #     else:926    #         sock.settimeout(1)927    #     sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)928    #     sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)929    #     addr = ('localhost', defaultPORT + target_id)930    #     # payload['source'] = 'server'931    #     # payload['server_id'] = self.server_id932    #     try:933    #         sock.connect(addr)934    #         sock.sendall(json.dumps(payload).encode('utf-8'))935    #         self.log('Paxos #{}, sending {} to {}'.format(payload['instance'],936    #                   payload, target_id))937    #         if recv:938    #             try:939    #                 return json.loads(sock.recv(1024).decode('utf-8'))940    #             except ConnectionResetError as e:941    #                 self.log(">>>>>>\ntarget_id: {}\npayload: {}\n<<<<<<<".format(target_id, payload))942    #                 raise e943    #     except ConnectionRefusedError:944    #         self.log("peer {} is down, removing it".format(addr))945    #         self.failed_nodes.add(target_id)946    #         if target_id == self.leader:947    #             self.log("send_msg to leader {} failed, trigger election".format(addr))948    #             self.elect_leader()949    #     except socket.timeout as e:950    #         self.log("socket timeout {}\ntarget_id: {}\npayload: {}".format(e, target_id, payload))951    #         raise e952    #     finally:953    #         sock.close()954    # send to all followers (every nodes except myself)955    # def send_all (self, payload):956    #     self.log("send_all({})".format(payload))957    #     replies = []958    #     for i in range(1, self.total_nodes + 1):959    #         if not i == self.server_id and not (i in self.failed_nodes):960    #             self.log('send_all from {} to {}'.format(self.server_id, i))961    #             reply = self.send_msg(i, payload)962    #             if reply is not None:963    #                 replies.append((i, reply))964    #     return replies965    # check whether this server is the current leader966    # def notify(self):967    #     with self.handler_lock:968    #         if self.waiting == W_OK:969    #             return970    #         if time.time() > self.waiting_settime + WAITING_TIMEOUT:971    #             if self.waiting == W_WAITING_PREPARE_REPLIES:972    #                 self.send_prepare(*self.wait_args)973    #             elif self.waiting == W_WAITING_ACCEPT_REPLIES:974    #                 self.send_accept(*self.wait_args)975    #             else:976    #                 assert False, "invalid waiting: {}".format(self.waiting)977    # def elect_leader(self):978    #     with self.handler_lock:979    #         if self.leader not in self.failed_nodes:980    #             self.failed_nodes.add(self.leader)981    #         assert not self.is_electing982    #         self.is_electing = True983    #         idx = self.get_next_instance_idx()984    #         data = {985    #             'instance': idx,986    #             'command': S_ELECT_LEADER(self.server_id)987    #         }988    #         assert idx not in self.progress989    #         self.init_progress(idx)990    #         data['proposal_n'] = self.bump_next_proposal_n(idx)991    #         self.send_prepare(data)992# start server993def start (sid, total, recover=False):994    server = LockServer(sid, total, recover=recover)995    heartbeat_server_thr = HeartBeatRecvThread(sid, server)996    heartbeat_server_thr.start()997    heartbeat_sender_thr = HeartBeatSenderThread(server)998    heartbeat_sender_thr.start()999    redirect_sender_thr = RedirectSenderThread(server)1000    redirect_sender_thr.start()1001    fetch_server_thr = FetchRecvThread(sid, server)1002    fetch_server_thr.start()1003    heartbeat_checker_thr = HeartBeatCheckerThread(heartbeat_server_thr.server, server)1004    heartbeat_checker_thr.start()1005    # handle signal1006    def sig_handler (signum, frame):1007        heartbeat_checker_thr.stopped = True1008        heartbeat_sender_thr.stopped = True1009        redirect_sender_thr.stopped = True1010        heartbeat_checker_thr.join()1011        heartbeat_sender_thr.join()1012        redirect_sender_thr.join()1013        heartbeat_server_thr.server.shutdown()1014        heartbeat_server_thr.server.server_close()1015        heartbeat_server_thr.join()1016        fetch_server_thr.server.shutdown()1017        fetch_server_thr.server.server_close()1018        fetch_server_thr.join()1019        server.server_close()1020        exit(0)1021    # register signal handler1022    signal.signal(signal.SIGINT, sig_handler)1023    signal.signal(signal.SIGTERM, sig_handler)1024    # serve until explicit shutdown1025    ip, port = server.server_address1026    server.log('Listening on {}:{} ...'.format(ip, port))1027    server.serve_forever()1028if __name__ == "__main__":1029    if len(sys.argv) < 3:1030        print('Usage:\npython paxos_processor.py [server_id] [total_nodes] [-v]')1031        exit(0)1032    if len(sys.argv) == 4 and "v" in sys.argv[3]:1033        VERBOSE = True1034    else:1035        VERBOSE = False1036    if len(sys.argv) == 4 and "r" in sys.argv[3]:1037        RECOVER = True1038    else:1039        RECOVER = False1040    print("Start Recover Mode...")...test_lock_server.py
Source:test_lock_server.py  
1# Copyright 2011 OpenStack Foundation2# Copyright 2013 IBM Corp.3#4#    Licensed under the Apache License, Version 2.0 (the "License"); you may5#    not use this file except in compliance with the License. You may obtain6#    a copy of the License at7#8#         http://www.apache.org/licenses/LICENSE-2.09#10#    Unless required by applicable law or agreed to in writing, software11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13#    License for the specific language governing permissions and limitations14#    under the License.15import webob16import mock17from nova.api.openstack import common18from nova.api.openstack.compute.legacy_v2.contrib import admin_actions \19        as lock_server_v220from nova.api.openstack.compute import lock_server as lock_server_v2121from nova import context22from nova import exception23from nova import test24from nova.tests.unit.api.openstack.compute import admin_only_action_common25from nova.tests.unit.api.openstack import fakes26from nova.tests.unit import fake_instance27class LockServerTestsV21(admin_only_action_common.CommonTests):28    lock_server = lock_server_v2129    controller_name = 'LockServerController'30    authorization_error = exception.PolicyNotAuthorized31    _api_version = '2.1'32    def setUp(self):33        super(LockServerTestsV21, self).setUp()34        self.controller = getattr(self.lock_server, self.controller_name)()35        self.compute_api = self.controller.compute_api36        def _fake_controller(*args, **kwargs):37            return self.controller38        self.stubs.Set(self.lock_server, self.controller_name,39                       _fake_controller)40        self.mox.StubOutWithMock(self.compute_api, 'get')41    def test_lock_unlock(self):42        self._test_actions(['_lock', '_unlock'])43    def test_lock_unlock_with_non_existed_instance(self):44        self._test_actions_with_non_existed_instance(['_lock', '_unlock'])45    def test_unlock_not_authorized(self):46        self.mox.StubOutWithMock(self.compute_api, 'unlock')47        instance = self._stub_instance_get()48        self.compute_api.unlock(self.context, instance).AndRaise(49                exception.PolicyNotAuthorized(action='unlock'))50        self.mox.ReplayAll()51        body = {}52        self.assertRaises(self.authorization_error,53                          self.controller._unlock,54                          self.req, instance.uuid, body)55class LockServerTestsV2(LockServerTestsV21):56    lock_server = lock_server_v257    controller_name = 'AdminActionsController'58    authorization_error = webob.exc.HTTPForbidden59    _api_version = '2'60class LockServerPolicyEnforcementV21(test.NoDBTestCase):61    def setUp(self):62        super(LockServerPolicyEnforcementV21, self).setUp()63        self.controller = lock_server_v21.LockServerController()64        self.req = fakes.HTTPRequest.blank('')65    def test_lock_policy_failed(self):66        rule_name = "os_compute_api:os-lock-server:lock"67        self.policy.set_rules({rule_name: "project:non_fake"})68        exc = self.assertRaises(69                                exception.PolicyNotAuthorized,70                                self.controller._lock, self.req,71                                fakes.FAKE_UUID,72                                body={'lock': {}})73        self.assertEqual(74                      "Policy doesn't allow %s to be performed." % rule_name,75                      exc.format_message())76    def test_unlock_policy_failed(self):77        rule_name = "os_compute_api:os-lock-server:unlock"78        self.policy.set_rules({rule_name: "project:non_fake"})79        exc = self.assertRaises(80                                exception.PolicyNotAuthorized,81                                self.controller._unlock, self.req,82                                fakes.FAKE_UUID,83                                body={'unlock': {}})84        self.assertEqual(85                      "Policy doesn't allow %s to be performed." % rule_name,86                      exc.format_message())87    @mock.patch.object(common, 'get_instance')88    def test_unlock_policy_failed_with_unlock_override(self,89                                                       get_instance_mock):90        ctxt = context.RequestContext('fake', 'fake')91        instance = fake_instance.fake_instance_obj(ctxt)92        instance.locked_by = "fake"93        get_instance_mock.return_value = instance94        rule_name = ("os_compute_api:os-lock-server:"95                     "unlock:unlock_override")96        rules = {"os_compute_api:os-lock-server:unlock": "@",97                 rule_name: "project:non_fake"}98        self.policy.set_rules(rules)99        exc = self.assertRaises(100            exception.PolicyNotAuthorized, self.controller._unlock,101            self.req, fakes.FAKE_UUID, body={'unlock': {}})102        self.assertEqual(103            "Policy doesn't allow %s to be performed." % rule_name,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
