How to use message_to_subscriber method in localstack

Best Python code snippet using localstack_python

sns_listener.py

Source:sns_listener.py Github

copy

Full Screen

...304 sns_backend = SNSBackend.get()305 subscriptions = sns_backend.sns_subscriptions.get(topic_arn, [])306 async def wait_for_messages_sent():307 subs = [308 message_to_subscriber(309 message_id,310 message,311 topic_arn,312 req_data,313 headers,314 subscription_arn,315 skip_checks,316 sns_backend,317 subscriber,318 subscriptions,319 )320 for subscriber in list(subscriptions)321 ]322 if subs:323 await asyncio.wait(subs)324 asyncio.run(wait_for_messages_sent())325async def message_to_subscriber(326 message_id,327 message,328 topic_arn,329 req_data,330 headers,331 subscription_arn,332 skip_checks,333 sns_backend,334 subscriber,335 subscriptions,336):337 if subscription_arn not in [None, subscriber["SubscriptionArn"]]:338 return339 filter_policy = json.loads(subscriber.get("FilterPolicy") or "{}")...

Full Screen

Full Screen

sub.py

Source:sub.py Github

copy

Full Screen

1#!/usr/bin/python32import socket3import sys4import getopt5import time6import threading7'''The following variables:8 PORT_SUB9 HOST10 PORT_BROK11 SUB_ID12 will be defined in the if __name__ == "__main__":13 loop at the end, depending on the sys.argv (system argument values)14 that the user has defined when he executed the current program.'''15'''This following string variable message_to_subscriber will be the message that the operator of the 16 program - subscriber will look at if the commands of his .cmd file run out or if he does not import a .cmd file. 17 Moreover after importing a correct message that does no raise errors, this message will appear again on his screen18 until he quits subscribing/unsubscribibg'''19message_to_subscriber='''If you want to quit subscribing there are four ways:\n20 a) Type quit21 b) Give empty input22 c) Give an input that will not start with an integer and then space.23 d) Perform KeyboardInterrupt24 On any other case I will assume that you are trying to subscribe or unsubscribe from a thread25 If you want to subscribe somewhere, this must be formed as follows:26 10 sub #world27 If you want to unsubscribe from somewhere, this must be formed as follows:28 300 unsub #hello29 The first column in the file represents the number of seconds that the subscriber should wait before executing that 30 command (in the above cases 10 or 300). This number should be greater or equal to 0. The second column represents the command to execute:31 this will always be sub or unsub for the subscriber. The third column represents the topic that the subscriber will subscribe/unsubsribe 32 to. All topics both for publisher and subscriber are one keyword. If you do not use this formalization, and you have not quit this program,33 you will not subscribe/unsubsribe from anywhere but you will be asked again to give something as input.34 '''35# Function that receives another function and executes it as a daemnon thread36def start_thread_daemon(thread_func):37 thread=threading.Thread(target=thread_func,daemon=True)38 thread.start()39#Mandatory import of all 4 first arguments (only - f optional) when calling the file to execute40def gethelp(argv):41 arg_help = "-i <id> -h <ip> -p <brok> -r <sub> -f <file.cmd (optional)>"42 try:43 # f without : because -f is optional44 opts, args = getopt.getopt(argv[1:], "si:h:p:r:f") 45 except:46 # Function that prints a message to help the user call the program properly - We will define it in a while...47 stop_wrong_import(arg_help) 48 49 # We certainly have given less than 4 arguments (or more than five which is also bad) as input so interrupt! ()50 if len (opts)!=4 and len (opts)!=5 : 51 stop_wrong_import(arg_help)52 elif len (opts)==4:53 for opt,val in opts:54 #Someone needs help, s was typed as argument 55 if 's' in opt: 56 stop_wrong_import (arg_help)57 if opt not in ['-i', '-h', '-p','-r']:58 stop_wrong_import(arg_help)59 else :60 if val =='-f':61 stop_wrong_import(arg_help)62 else:63 pass64 else:65 pass66def stop_wrong_import(message): 67 print ('Please it is mandatory to import all the following arguments:')68 print(message)69 print ('The 4 mandatory arguments may be inserted at a random order, but if you insert -f <file.cmd>, this must be the last argument to be typed ')70 sys.exit(1)71#read the cmd file if exists72def read_cmd_file (): 73 try:74 #Here f is with :, so if it will raise an error it will be that -f had not an input or -f had as input a directory that does not exist75 opts, args = getopt.getopt(sys.argv[1:], "si:h:p:r:f:") 76 for opt,val in opts:77 if opt=='-f':78 f = open(val,"r")79 contents = f.read()80 f.close() 81 return contents.splitlines()82 except:83 print ('\nNo input file was given as -f or incorrect path was given after -f') 84#takes a formalized command, performs the pause depending on the85#seconds that command include, and then returns the rest of the text as a string 86def execute_command(command):87 all_info=command.split()88 #the first element of the new list is the seconds we will have to wait89 time.sleep(int(all_info[0])) 90 #We add the Sub _Id to the data that the subscriber will transmit. For example, data 91 # will be formalized as follows: s1 sub #hello or s2 unsub #world etc92 data=SUB_ID+' '+all_info[1]+' ' 93 data=data+' '.join(all_info[2:])94 return data95#Function that initiates a socket.socket which binds the socket of the subscriber and 96#connects with the socket that the broker hears the subscribers 97def initiate_socket(): 98 global sock99 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)100 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)101 # We want our specific subscriber to transmit from a specific socket 102 sock.bind((HOST,PORT_SUB)) 103 #Connect to the specific socket that the broker hears the subscribers104 sock.connect((HOST, PORT_BROK)) 105 106def send_subs_unsubs(text,waiting=False): #function that communicates with the broker actively by sending him the commands after processing them 107 # from the input that the subscriber has entered manually or automatically (by the -f file)108 global sock109 data=execute_command(text)110 sock.sendall(bytes(data + "\n", "utf-8"))111 #This is an optional time sleeping so that the user of the program has time to read what he received from the broker, 112 # before he sees in the screen of the command line the whole message_to_subscriber. Mind that when this function will be executed for 113 # reading all the commands in the -f file, waiting time must not be activated since the message_to_subscriber is not printed and the 114 # subscriptions/ unsubcriptions are executed automatically depending on the commands. Moreover, in this sleeping time, the user can not115 # subscribe or unsubscribe manually because the input will be executed after the sleeping. But since this function will be executed in 116 # a thread, if in the meanwhile a publisher sends something regarding a topic that this subscriber is enrolled, he will see the message 117 # in his screen. 118 if waiting:119 time.sleep(5) 120#Receives not only the publishments of the topics that the subscriber is enrolled but also121#the info from the broker regarding the situation of his subscription /unsubscription request every time he sends a request122def always_listening():123 global sock124 #just a silly initial message to the terminal of the subscriber 125 print ('''No matter what happens during the execution of this program, I am a daemon thread that126 ,in 2 seconds from now, will start listening to the broker all the time and print what sends 127 back until the execution of the current program is terminated ''')128 while True:129 #This is for ConnectionAbortedError, meaning that the Broker shut us down because he has reached max capacity (5) for subscribers130 try:131 received = str(sock.recv(1024), "utf-8") 132 print(received)133 except ConnectionAbortedError:134 break135def main():136 initiate_socket()137 start_thread_daemon(always_listening)138 #2 seconds for the subscriber to read the first print message inside the function always_listening139 time.sleep(2) 140 inputs=read_cmd_file ()141 if inputs is None:142 print ('You did not give input file, proceeding to manually input your subscriptions!\n')143 pass144 else:145 for command in inputs:146 #Here we leave the default waiting=False because we don't want to pause the program. The messages from 147 #the broker will be clear to see since print (message_to_subscriber) won't be executed.148 send_subs_unsubs(command) 149 150 #In my localhost there was a little lag in the final receipt of the message from the broker, so we give 1 second notice151 # to the deamon thread (always listening) to deliver us the final message before the message_to_subscriber appears on the screen 152 time.sleep(1) 153 print (message_to_subscriber)154 text_to_sub=input()155 try:156 while text_to_sub!='quit':157 #Here we implement waiting=True because we want to pause the program. 158 #The messages from the broker wont be clear to see since print (message_to_subscriber) will be executed to give instructions.159 #to the user for manual input of the messages to be published160 send_subs_unsubs(text_to_sub,waiting=True)161 #In my localhost there was a little lag in the final receipt of the message from the broker, so we give 1 second notice162 # to the deamon thread (always listening) to deliver us the final message before the message_to_subscriber appears on the screen163 time.sleep(1) 164 print (message_to_subscriber)165 text_to_sub=input() 166 except ValueError:167 print ('You gave input that did not start with an integer and then space. Quiting...')168 sys.exit(0)169 except IndexError:170 print ('You gave empty input. Quiting...')171 sys.exit(0) 172if __name__ == "__main__":173 # We want to be sure that we will receive all the necessary arguments when executing the program174 gethelp(sys.argv) 175 #reading the arguments from the execution of the .py file176 for i in range (len(sys.argv)): 177 if sys.argv[i]=='-r':178 PORT_SUB=int(sys.argv[i+1])179 if sys.argv[i]=='-h':180 HOST= sys.argv[i+1]181 if sys.argv[i]=='-p':182 PORT_BROK= int(sys.argv[i+1])183 if sys.argv[i]=='-i':184 SUB_ID=sys.argv[i+1]185 186 try:187 main()188 #This exception handles the overload of the system. If the broker has already five subscribers conected, we will exit gently. We have raised189 #this exception both here and in daemon thread of always_listening function, because it would raise that error in both parts, if the 190 # broker had closed the socket for that reason.191 except ConnectionAbortedError: 192 print('\nFull capacity of subscribers. Please try to connect in a while if a connected subscriber quits his connection')193 time.sleep(2)194 sys.exit(0)195 #This exception may only be executed at the time of the manual inputs of the publisher196 except KeyboardInterrupt: 197 print ('You performed KeyboardInterrupt. Quiting....')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful