How to use __flush method in avocado

Best Python code snippet using avocado_python

transeiver.py

Source:transeiver.py Github

copy

Full Screen

1import serial2import multiprocessing3import time4class Transeiver:5 __MESSAGING = '0'6 __SET_TX_ADDRESS = '1'7 __SET_RX_ADDRESS = '2'8 __GET_TX_ADDRESS = '3'9 __GET_RX_ADDRESS = '4'10 __FLUSH = '\r' #should use something different11 12 def __init__(self, TXaddress = "00000", RXaddress = "11111", SERIAL_PORT_NAME = None, BAUD_RATE = 9600):13 self.__serial_port = None14 self.TXaddress = TXaddress15 self.RXaddress = RXaddress16 self.__fetchMessagesProcess = None17 self.__receive_queue = None18 if SERIAL_PORT_NAME != None:19 self.setConnection(SERIAL_PORT_NAME, BAUD_RATE)20 21 def setConnection(self, SERIAL_PORT_NAME, BAUD_RATE):22 self.__serial_port = serial.Serial(SERIAL_PORT_NAME, BAUD_RATE)23 self.__jottleConnection()24 self.__startParallelReceivingProcess()25 self.setTXaddress(self.TXaddress)26 self.setRXaddress(self.RXaddress)27 def __jottleConnection(self):28 time.sleep(0.1)29 self.__serial_port.setDTR(False) #this prevents a lock up how does it work?30 time.sleep(0.1)31 self.__serial_port.setRTS(True) #this prevents a lock up how does it work?32 time.sleep(0.1)33 34 def __startParallelReceivingProcess(self):35 self.__receive_queue = multiprocessing.Queue()36 self.__fetchMessagesProcess = multiprocessing.Process(target = Transeiver.receiving, args = (self.__serial_port, self.__receive_queue)) #maybe just make a new one37 self.__fetchMessagesProcess.start()38 39 40 def setAddresses(self, TXaddress, RXaddress):41 self.setTXaddress(TXaddress)42 self.setRXaddress(RXaddress)43 44 def setTXaddress(self, TXaddress):45 self.__serial_port.write(bytes(Transeiver.__SET_TX_ADDRESS + self.TXaddress + Transeiver.__FLUSH,encoding = 'utf-8')) #redundant but I think it clarifies the code46 self.__serial_port.write(bytes(Transeiver.__GET_TX_ADDRESS + Transeiver.__FLUSH, encoding = 'utf-8'))47 self.TXaddress = self.__receive_queue.get()48 49 def setRXaddress(self, RXaddress):50 self.__serial_port.write(bytes(Transeiver.__SET_RX_ADDRESS + self.RXaddress + Transeiver.__FLUSH,encoding = 'utf-8'))51 self.__serial_port.write(bytes(Transeiver.__GET_RX_ADDRESS + Transeiver.__FLUSH, encoding = 'utf-8'))52 self.RXaddress = self.__receive_queue.get()53 54 def getTXaddress(self):55 return self.TXaddress56 57 def getRXaddress(self):58 return self.RXaddress59 60 def __printSerialPortState(self):61 print("DTR: ", self.__serial_port.dtr)62 print("RTS: ", self.__serial_port.rts)63 print("CTS: ", self.__serial_port.cts)64 print("DSR: ", self.__serial_port.dsr)65 print("RI: ", self.__serial_port.ri)66 print("CD: ", self.__serial_port.cd)67 68 @staticmethod69 def receiving(ser,queue): #gets the receiving messages in a parallel process70 cant_decode = False71 while True:72 reading = ser.readline()73 reading = str(reading)74 try: #if you remove the try except and type clear, the INVALID state message will cause reading.index(':') to throw an exception75 l = reading.index(':') + 176 if "\\x" in reading:77 reading = reading[l:len(reading) - 3]78 else:79 r = reading.index('\\')80 reading = reading[l:r]81 except:82 pass83 queue.put(reading)84 print("\r" + str(reading) + (" " * 50) + "\n")85 print("\nEnter a message to write:",end="")86 def getMessage(self):87 messages = None88 if self.__receive_queue != None and not self.__receive_queue.empty():89 message = self.__receive_queue.get()90 return message91 92 93 def sendMessage(self, message):94 self.__serial_port.write(bytes(message + Transeiver.__FLUSH, encoding = 'utf-8'))95 96 def __del__(self):97 self.__fetchMessagesProcess.terminate()...

Full Screen

Full Screen

profiler.py

Source:profiler.py Github

copy

Full Screen

...33 Returns:34 A new ProfileEvent.35 """36 if len(self._events) >= 100:37 self.__flush()38 now = Timestamp(time())39 event = ProfileEvent(self._instance_id, now, now, event_type, port,40 port_length, slot, message_size)41 self._events.append(event)42 return event43 def shutdown(self) -> None:44 self.__flush()45 def record_event(self, event: ProfileEvent) -> None:46 """Record a profiling event.47 This will record the event, and may flush this and previously48 recorded events to the manager.49 Args:50 event: The event to record.51 """52 self._events.append(event)53 if len(self._events) >= 100:54 self.__flush()55 if event.event_type == ProfileEventType.DEREGISTER:56 self.__flush()57 def __flush(self) -> None:58 if self._events:59 self._manager.submit_profile_events(self._events)...

Full Screen

Full Screen

stat_debug.py

Source:stat_debug.py Github

copy

Full Screen

...41 def __del__(self):42 self.__close()43 def __close(self):44 if self.__file is not None:45 self.__flush()46 self.__file.flush()47 self.__file.close()48 self.__file = None49 def __flush(self):50 if self.__row:51 self.__file.write(','.join(self.__row) + '\n')52 self.__row = []53 def write(self, text):54 item = str(text).strip(' ')55 if item == '':56 return57 if item.strip():58 if item.find('ncalls') != -1:59 self.__row.extend([caption for caption in item.split()])60 else:61 self.__row.append(item)62 if item.find('\n') == -1:63 return...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful