Best Python code snippet using localstack_python
sns_listener.py
Source:sns_listener.py  
...304    sns_backend = SNSBackend.get()305    subscriptions = sns_backend.sns_subscriptions.get(topic_arn, [])306    async def wait_for_messages_sent():307        subs = [308            message_to_subscriber(309                message_id,310                message,311                topic_arn,312                req_data,313                headers,314                subscription_arn,315                skip_checks,316                sns_backend,317                subscriber,318                subscriptions,319            )320            for subscriber in list(subscriptions)321        ]322        if subs:323            await asyncio.wait(subs)324    asyncio.run(wait_for_messages_sent())325async def message_to_subscriber(326    message_id,327    message,328    topic_arn,329    req_data,330    headers,331    subscription_arn,332    skip_checks,333    sns_backend,334    subscriber,335    subscriptions,336):337    if subscription_arn not in [None, subscriber["SubscriptionArn"]]:338        return339    filter_policy = json.loads(subscriber.get("FilterPolicy") or "{}")...sub.py
Source:sub.py  
1#!/usr/bin/python32import socket3import sys4import getopt5import time6import threading7'''The following variables:8 PORT_SUB9 HOST10 PORT_BROK11 SUB_ID12 will be defined in the if __name__ == "__main__":13 loop at the end, depending on the sys.argv (system argument values)14 that the user has defined when he executed the current program.'''15'''This following string variable message_to_subscriber will  be the message that the operator of the 16    program - subscriber will look at if the commands of his .cmd file run out or if he does not import a .cmd file. 17    Moreover after importing a correct message that does no raise errors, this message will appear again on his screen18     until he quits subscribing/unsubscribibg'''19message_to_subscriber='''If you want to quit subscribing there are four ways:\n20    a) Type quit21    b) Give empty input22    c) Give an input that will not start with an integer and then space.23    d) Perform KeyboardInterrupt24    On any other case I will assume that you are trying to subscribe or unsubscribe  from a thread25    If you want to subscribe somewhere, this must be formed as follows:26    10 sub #world27    If you want to unsubscribe from somewhere, this must be formed as follows:28    300 unsub #hello29    The first column in the file represents the number of seconds that the subscriber should wait before executing that 30    command (in the above cases 10 or 300). This number should be greater or equal to 0. The second column represents the command to execute:31     this will always be sub or unsub for the subscriber. The third column represents the topic that the subscriber will subscribe/unsubsribe 32     to. All topics both for publisher and subscriber are one keyword. If you do not use this formalization, and you have not quit this program,33      you will not subscribe/unsubsribe from anywhere but you will be asked again to give something as input.34    '''35# Function that receives another function and executes it as a daemnon thread36def start_thread_daemon(thread_func):37    thread=threading.Thread(target=thread_func,daemon=True)38    thread.start()39#Mandatory import of all 4 first arguments (only - f optional) when calling the file to execute40def gethelp(argv):41    arg_help = "-i <id> -h <ip> -p <brok> -r <sub> -f <file.cmd (optional)>"42    try:43        # f without : because -f is optional44        opts, args = getopt.getopt(argv[1:], "si:h:p:r:f") 45    except:46        # Function that prints a message to help the user call the program properly - We will define it in a while...47        stop_wrong_import(arg_help) 48    49    # We certainly have given less than 4 arguments (or more than five which is also bad) as input so interrupt! ()50    if len (opts)!=4 and len (opts)!=5 : 51        stop_wrong_import(arg_help)52    elif len (opts)==4:53        for opt,val in opts:54            #Someone needs help, s was typed as argument 55            if 's' in opt: 56                stop_wrong_import (arg_help)57            if opt not in ['-i', '-h', '-p','-r']:58                stop_wrong_import(arg_help)59            else :60                if val =='-f':61                    stop_wrong_import(arg_help)62                else:63                    pass64    else:65        pass66def stop_wrong_import(message): 67    print ('Please it is mandatory to import all the following arguments:')68    print(message)69    print ('The 4 mandatory arguments may be inserted at a random order, but if you insert -f <file.cmd>, this must be the last argument to be typed ')70    sys.exit(1)71#read the cmd file if exists72def read_cmd_file (): 73    try:74        #Here f is with :, so if it will raise an error it will be that -f had not an input or -f had as input a directory that does not exist75        opts, args = getopt.getopt(sys.argv[1:], "si:h:p:r:f:") 76        for opt,val in opts:77            if opt=='-f':78                f = open(val,"r")79                contents = f.read()80                f.close() 81                return contents.splitlines()82    except:83        print ('\nNo input file was given as -f or incorrect path was given after -f')  84#takes a formalized command, performs the pause depending on the85#seconds that command include, and then returns the rest of the text as a string     86def execute_command(command):87    all_info=command.split()88    #the first element of the new list is the seconds we will have to wait89    time.sleep(int(all_info[0])) 90    #We add the Sub _Id to the data that the subscriber will transmit. For example, data 91    # will be formalized as follows:      s1 sub #hello or s2 unsub #world etc92    data=SUB_ID+' '+all_info[1]+' ' 93    data=data+' '.join(all_info[2:])94    return data95#Function that initiates  a socket.socket which binds the socket of the subscriber and 96#connects with the socket that the broker hears the subscribers 97def initiate_socket(): 98    global sock99    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)100    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)101    # We want our specific subscriber to transmit from a specific socket 102    sock.bind((HOST,PORT_SUB)) 103    #Connect to the specific socket that the broker hears the subscribers104    sock.connect((HOST, PORT_BROK)) 105    106def send_subs_unsubs(text,waiting=False): #function that communicates with the broker actively by sending him the commands after processing them 107    # from the input that the subscriber has entered manually or automatically (by the -f file)108    global sock109    data=execute_command(text)110    sock.sendall(bytes(data +  "\n", "utf-8"))111    #This is an optional time sleeping so that the user of the program has time to read what he received from the broker, 112    # before he sees in the screen of the command line the whole message_to_subscriber. Mind that when this function will be executed for 113    # reading all the commands in the -f file, waiting time must not be activated since the message_to_subscriber is not printed and the 114    # subscriptions/ unsubcriptions are executed automatically depending on the commands. Moreover, in this sleeping time, the user can not115    #  subscribe or unsubscribe manually because the input will be executed after the sleeping. But since this function will be executed in 116    # a thread, if in the meanwhile a publisher sends something regarding a topic that this subscriber is enrolled, he will see the message 117    # in his screen. 118    if waiting:119        time.sleep(5) 120#Receives not only the publishments of the topics that the subscriber is enrolled but also121#the info from the broker regarding the situation of his subscription /unsubscription request every time he sends a request122def always_listening():123    global sock124    #just a silly initial message to the terminal of the subscriber 125    print ('''No matter what happens during the execution of this program, I am a daemon thread that126    ,in 2 seconds from now, will start listening to  the broker all the time and print what sends 127    back until the execution of the current program is terminated ''')128    while True:129        #This is for ConnectionAbortedError, meaning that the Broker shut us down because he has reached max capacity (5) for subscribers130        try:131            received = str(sock.recv(1024), "utf-8") 132            print(received)133        except ConnectionAbortedError:134            break135def main():136    initiate_socket()137    start_thread_daemon(always_listening)138    #2 seconds for the subscriber to read the first print message inside the function always_listening139    time.sleep(2) 140    inputs=read_cmd_file ()141    if inputs is None:142        print ('You did not give input file, proceeding to manually input your subscriptions!\n')143        pass144    else:145        for command in inputs:146            #Here we leave the default waiting=False because we don't want to pause the program. The messages from 147            #the broker will be clear to see since print (message_to_subscriber) won't be executed.148            send_subs_unsubs(command) 149    150     #In my localhost there was a little lag in the final receipt of the message from the broker, so we give 1 second notice151    # to the deamon thread (always listening) to deliver us the final message before the message_to_subscriber appears on the screen      152    time.sleep(1)  153    print (message_to_subscriber)154    text_to_sub=input()155    try:156        while text_to_sub!='quit':157            #Here we  implement waiting=True because we  want to pause the program. 158            #The messages from the broker wont be clear to see since print (message_to_subscriber) will be executed to give instructions.159            #to the user for manual input of the messages to be published160            send_subs_unsubs(text_to_sub,waiting=True)161            #In my localhost there was a little lag in the final receipt of the message from the broker, so we give 1 second notice162            # to the deamon thread (always listening) to deliver us the final message before the message_to_subscriber appears on the screen163            time.sleep(1) 164            print (message_to_subscriber)165            text_to_sub=input()     166    except ValueError:167        print ('You gave input that did not  start with an integer and then space. Quiting...')168        sys.exit(0)169    except IndexError:170        print ('You gave empty input. Quiting...')171        sys.exit(0)    172if __name__ == "__main__":173    # We want to be sure that we will receive all the necessary arguments when executing the program174    gethelp(sys.argv) 175    #reading the arguments from the execution of the .py file176    for i in range (len(sys.argv)): 177        if sys.argv[i]=='-r':178            PORT_SUB=int(sys.argv[i+1])179        if sys.argv[i]=='-h':180            HOST= sys.argv[i+1]181        if sys.argv[i]=='-p':182            PORT_BROK= int(sys.argv[i+1])183        if sys.argv[i]=='-i':184            SUB_ID=sys.argv[i+1]185    186    try:187        main()188    #This exception handles the overload of the system. If the broker has already five subscribers conected, we will exit gently. We have raised189    #this exception both here and in daemon thread of always_listening function, because it would raise that error in both parts, if the 190    # broker had closed the socket for that reason.191    except ConnectionAbortedError: 192        print('\nFull capacity of subscribers. Please try to connect in a while if a connected subscriber quits his connection')193        time.sleep(2)194        sys.exit(0)195    #This exception may only be executed at the time of the manual inputs of the publisher196    except KeyboardInterrupt: 197        print ('You performed KeyboardInterrupt. Quiting....')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
