How to use read_thread method in locust

Best Python code snippet using locust

BillValidator_Thread.py

Source:BillValidator_Thread.py Github

copy

Full Screen

1__author__ = 'ardzoht'2import serial3from threading import Thread4from VMC.Utils import Commands, Tubes5import time6import json7import logging8import requests9class ReadThread(Thread):10 # Bill information11 deposited_20, deposited_50, deposited_100, deposited_200 = False, False, False, False12 validator_ok = False13 # Balance information14 balance = 015 bill_count = 016 # Flags17 must_accept = False18 must_reset = False19 first_run = False20 deposited_50c, deposited_1, deposited_2, deposited_5, deposited_10 = False, False, False, False, False21 available_50c, available_1, available_2, available_5 = 0, 0, 0, 022 tubes = Tubes.Tubes()23 hopper_ok = False24 dispense_error = False25 logging.basicConfig(level=logging.INFO)26 def __init__(self, thread_id, name):27 Thread.__init__(self)28 self.thread_id = thread_id29 self.name = name30 def run(self):31 logging.info('Starting thread: ' + self.name)32 self.reading()33 def reading(self):34 read, reading = True, ''35 while read:36 try:37 """38 NOTE:39 Check how is the reading handled.40 """41 data = serial_port.read()42 str_data = str(data)43 print str_data44 if str_data == '<':45 logging.info('Received: {}'.format(reading))46 if reading == 'B_BAC_FF80':47 self.deposited_20 = True48 logging.info('Validator READER -> $20 Deposited')49 elif reading == 'B_BAC_FF81':50 self.deposited_50 = True51 logging.info('Validator READER -> $50 Deposited')52 elif reading == 'B_BAC_FF82':53 self.deposited_100 = True54 logging.info('Validator READER -> $100 Deposited')55 elif reading == 'B_BAC_FF83':56 self.deposited_200 = True57 logging.info('Validator READER -> $200 Deposited')58 elif reading == 'B_BAC_FF94':59 logging.info('Validator READER -> $500 NOT ALLOWED')60 elif reading == 'B_BTY_13130000<' or reading == 'B_BTY_15150000<':61 logging.info('Bills Enabled')62 elif reading[:-2] == 'C_CDM_50':63 available_50c = int(reading[-2:], 16)64 print 'READER: 1 coin $0.5, in tube: {}'.format(available_50c)65 self.deposited_50c = True66 elif reading[:-2] == 'C_CDM_52':67 available_1 = int(reading[-2:], 16)68 print 'READER: 1 coin $1.0, in tube: {}'.format(available_1)69 self.deposited_1 = True70 elif reading[:-2] == 'C_CDM_53':71 available_2 = int(reading[-2:], 16)72 print 'READER: 1 coin $2.0, in tube: {}'.format(available_2)73 self.deposited_2 = True74 elif reading[:-2] == 'C_CDM_54':75 available_5 = int(reading[-2:], 16)76 print 'READER: 1 coin $5.0, in tube: {}'.format(available_5)77 self.deposited_5 = True78 elif reading[:-2] == 'C_CDM_45':79 print 'READER: 1 coin $10.0 deposited to hopper'80 self.deposited_10 = True81 elif reading[:6] == 'C_TUB_':82 self.tubes.full_tubes = reading[7:10]83 self.tubes.tube_50c = int(reading[11:12], 16)84 self.tubes.tube_1 = int(reading[13:16])85 self.tubes.tube_2 = int(reading[17:18], 16)86 self.tubes.tube_5 = int(reading[19:20], 16)87 # Check for hopper availability88 elif reading == 'H_STA_OK<':89 self.hopper_ok = True90 elif reading == 'H_STA_ERROR<':91 self.hopper_ok = False92 print 'READER: Full tubes: {}'.format(self.tubes.full_tubes)93 print 'READER: Coins on tube_50c: {}'.format(self.tubes.tube_50c)94 print 'READER: Coins on tube_1: {}'.format(self.tubes.tube_1)95 print 'READER: Coins on tube_2: {}'.format(self.tubes.tube_2)96 print 'READER: Coins on tube_5: {}'.format(self.tubes.tube_5)97 # Check for Hopper/Changer error responses98 elif reading[:-2] == 'H_DIS_TIMEOUT_':99 self.dispense_error = True100 elif reading[:-1] == 'C_DIS_ERROR':101 self.dispense_error = True102 reading = ''103 else:104 reading += data105 except serial.SerialException:106 logging.error('Reader: There has been an error with the validator.')107class WriteThread(Thread):108 thread_id = None109 name = None110 bill_commands = Commands.BillCommands()111 def __init__(self, thread_id, name):112 Thread.__init__(self)113 self.thread_id = thread_id114 self.name = name115 def write_cmd(self, cmd):116 # TRY to open the serial port. Check if it's open117 try:118 if serial_port.isOpen():119 # Write the requested command120 serial_port.write(cmd['cmd'])121 logging.info('Sent: {} cmd'.format(cmd['name']))122 else:123 logging.info('Serial port NOT OPEN')124 except serial.SerialException:125 logging.error('Exception, data not written')126 time.sleep(1)127 # Bill Validator Commands128 def write_status(self):129 self.write_cmd(self.bill_commands.BILL_STATUS)130 def write_disable_all(self):131 self.write_cmd(self.bill_commands.BILL_DISABLE_ALL)132 def write_enable_all(self):133 self.write_cmd(self.bill_commands.BILL_ENABLE_20)134 self.write_cmd(self.bill_commands.BILL_ENABLE_50)135class BillValidator(Thread):136 read_thread = ReadThread(1, 'Reader')137 write_thread = WriteThread(2, 'Writer')138 vmc_commands = Commands.VmcCommands()139 number_of_coins = 100140 error_amount = 0141 def start_validator(self):142 """143 Sends commands to update validator144 :return:145 """146 self.write_thread.write_status()147 self.write_thread.write_disable_all()148 def update_data(self):149 self.write_thread.write_cmd(self.vmc_commands.C_TUBE_STATUS)150 av_50c = self.read_thread.tubes.tube_50c151 av_1 = self.read_thread.tubes.tube_1152 av_2 = self.read_thread.tubes.tube_2153 av_5 = self.read_thread.tubes.tube_5154 coins_on_hopper = self.number_of_coins155 data = [{"currency_id": 1, "currency_name": "50 centavos", "currency_quantity": av_50c},156 {"currency_id": 2, "currency_name": "1 peso", "currency_quantity": av_1},157 {"currency_id": 3, "currency_name": "2 pesos", "currency_quantity": av_2},158 {"currency_id": 4, "currency_name": "5 pesos", "currency_quantity": av_5},159 {"currency_id": 5, "currency_name": "10 pesos", "currency_quantity": coins_on_hopper}]160 url = "http://localhost:8000/Currency/"161 print "Uploading to " + url162 headers = {"Authorization": "Basic YmVjYXJpbzpiZWNhcmlv",163 "Content-Type": "application/json"}164 for d_id in xrange(1,5):165 json_data = json.dumps(data[d_id], default=lambda o: o.__dict__,166 sort_keys=True, indent=4)167 url = "http://localhost:8000/Currency/" + str(d_id) + "/"168 request = requests.put(url, data=json_data, headers=headers)169 def start_changer(self):170 self.write_thread.write_cmd(self.vmc_commands.C_SETUP)171 self.write_thread.write_cmd(self.vmc_commands.C_TUBE_STATUS)172 self.write_thread.write_cmd(self.vmc_commands.C_EXPANSION)173 self.write_thread.write_cmd(self.vmc_commands.disable_tubes())174 self.update_data()175 def write_dispense(self, units=0, cents=0):176 """ Writes a Dispense command. Also evaluates which coins should be dispensed. """177 # Evaluate coins.178 res = int(units)179 quantity_10 = res / 10180 res %= 10181 quantity_5 = res / 5182 res %= 5183 quantity_2 = res / 2184 res %= 2185 quantity_1 = res / 1186 quantity_50c = int(cents) / 5187 # Dispense coins depending on the coins it should dispense.188 print "Dispensing " + str(quantity_10) + " coins of 10"189 print "Dispensing " + str(quantity_5) + " coins of 5"190 print "Dispensing " + str(quantity_2) + " coins of 2"191 print "Dispensing " + str(quantity_1) + " coins of 1"192 print "Dispensing " + str(quantity_50c) + " coins of 50c"193 self.write_thread.write_cmd(self.vmc_commands.enable_tubes())194 # Check tubes status195 self.write_thread.write_cmd(self.vmc_commands.C_TUBE_STATUS)196 # Get available coins on each tube197 av_50c = self.read_thread.tubes.tube_50c198 print "Available 50c = " + str(av_50c)199 av_1 = self.read_thread.tubes.tube_1200 print "Available 1 = " + str(av_1)201 av_2 = self.read_thread.tubes.tube_2202 print "Available 2 = " + str(av_2)203 av_5 = self.read_thread.tubes.tube_5204 print "Available 5 = " + str(av_5)205 # Chopper206 if quantity_10 != 0:207 print "Dispensing Hopper coins = " + str(quantity_10)208 self.write_thread.write_cmd(self.vmc_commands.check_hopper())209 self.write_thread.write_cmd(self.vmc_commands.hopper_dispense(quantity_10))210 self.number_of_coins -= quantity_10211 if quantity_5 != 0:212 print "Dispensing Changer coins = " + str(quantity_5)213 self.write_thread.write_cmd(self.vmc_commands.changer_dispense(4, quantity_5))214 if quantity_2 != 0:215 print "Dispensing Changer coins = " + str(quantity_2)216 self.write_thread.write_cmd(self.vmc_commands.changer_dispense(3, quantity_2))217 if quantity_1 != 0:218 print "Dispensing Changer coins = " + str(quantity_1)219 self.write_thread.write_cmd(self.vmc_commands.changer_dispense(2, quantity_1))220 if quantity_50c != 0:221 print "Dispensing Changer coins = " + str(quantity_50c)222 self.write_thread.write_cmd(self.vmc_commands.changer_dispense(0, quantity_50c))223 # Abonar224 """225 if quantity_10 != 0 and quantity_10 <= self.number_of_coins:226 print "Dispensing Hopper coins = " + str(quantity_10)227 self.write_thread.write_cmd(self.commands.check_hopper())228 self.write_thread.write_cmd(self.commands.hopper_dispense(quantity_10))229 self.number_of_coins -= quantity_10230 elif quantity_10 != 0 and quantity_10 > self.number_of_coins:231 self.error_amount += quantity_10*10232 self.read_thread.dispense_error = True233 print "10 " + str(self.error_amount)234 if quantity_5 != 0 and quantity_5 <= av_5:235 print "Dispensing Changer coins = " + str(quantity_5)236 self.write_thread.write_cmd(self.commands.changer_dispense(4, quantity_5))237 if self.read_thread.dispense_error:238 self.error_amount -= quantity_5*5239 elif quantity_5 != 0 and quantity_2 > av_5:240 self.error_amount += quantity_5*5241 self.read_thread.dispense_error = True242 print "5 " + str(self.error_amount)243 if quantity_2 != 0 and quantity_2 <= av_2:244 print "Dispensing Changer coins = " + str(quantity_2)245 self.write_thread.write_cmd(self.commands.changer_dispense(3, quantity_2))246 if self.read_thread.dispense_error:247 self.error_amount -= quantity_2*2248 elif quantity_2 != 0 and quantity_2 > av_2:249 self.error_amount += quantity_2*2250 self.read_thread.dispense_error = True251 print "2 " + str(self.error_amount)252 if quantity_1 != 0 and quantity_1 <= av_1:253 print "Dispensing Changer coins = " + str(quantity_1)254 self.write_thread.write_cmd(self.commands.changer_dispense(2, quantity_1))255 if self.read_thread.dispense_error:256 self.error_amount -= quantity_1257 elif quantity_1 != 0 and quantity_1 > av_1:258 self.error_amount += quantity_1259 self.read_thread.dispense_error = True260 print " 1 " +str(self.error_amount)261 if quantity_50c != 0 and quantity_50c <= av_50c:262 print "Dispensing Changer coins = " + str(quantity_50c)263 self.write_thread.write_cmd(self.commands.changer_dispense(0, quantity_50c))264 if self.read_thread.dispense_error:265 self.error_amount -= quantity_50c*.5266 elif quantity_50c != 0 and quantity_50c > av_50c:267 self.error_amount += quantity_50c*.5268 self.read_thread.dispense_error = True269 print "50c " + str(self.error_amount)270 """271 self.write_thread.write_cmd(self.vmc_commands.disable_tubes())272 def write_accept(self):273 if not(self.read_thread.deposited_20 or self.read_thread.deposited_50 or274 self.read_thread.deposited_100 or self.read_thread.deposited_200):275 return 0.0276 elif self.read_thread.deposited_20:277 self.read_thread.deposited_20 = False278 return 20.0279 elif self.read_thread.deposited_50:280 self.read_thread.deposited_50 = False281 return 50.0282 elif self.read_thread.deposited_100:283 self.read_thread.deposited_100 = False284 return 100.0285 elif self.read_thread.deposited_200:286 self.read_thread.deposited_200 = False287 return 200.0288 else:289 return 0.0290 def write_changer_accept(self):291 print('ready to accept coins')292 if(not self.read_thread.deposited_50c) and (not self.read_thread.deposited_1) \293 and (not self.read_thread.deposited_2) and (not self.read_thread.deposited_5)\294 and (not self.read_thread.deposited_10):295 pass296 elif self.read_thread.deposited_50c:297 self.read_thread.deposited_50c = False298 return 0.5299 elif self.read_thread.deposited_1:300 self.read_thread.deposited_1 = False301 return 1.0302 elif self.read_thread.deposited_2:303 self.read_thread.deposited_2 = False304 return 2.0305 elif self.read_thread.deposited_5:306 self.read_thread.deposited_5 = False307 return 5.0308 elif self.read_thread.deposited_10:309 self.number_of_coins += 1310 self.read_thread.deposited_10 = False311 return 10.0312 else:313 return 0314 def socket_com(self, command=''):315 cmd, param = command.split()316 if cmd == 'ACCEPTBILL':317 return self.write_accept()318 elif cmd == 'DISPENSE':319 self.update_data()320 units, cents = param.split('.')321 self.write_dispense(units, cents)322 return 'OK'323 elif cmd == 'ACCEPT':324 print 'command accept'325 return self.write_changer_accept()326 else:327 pass328 def __init__(self, universal_port):329 super(BillValidator, self).__init__()330 global serial_port331 serial_port = universal_port332 def run(self):333 logging.info('Starting serial port')334 self.read_thread.start()335 self.start_validator()...

Full Screen

Full Screen

Changer_Thread.py

Source:Changer_Thread.py Github

copy

Full Screen

1import json2__author__ = 'mgradob'3""" Imports """4import threading5import serial6from VMC.Utils import Commands, Tubes7import time8import requests9class ReadThread(threading.Thread):10 deposited_50c, deposited_1, deposited_2, deposited_5, deposited_10 = False, False, False, False, False11 available_50c, available_1, available_2, available_5 = 0, 0, 0, 012 tubes = Tubes.Tubes()13 hopper_ok = False14 dispense_error = False15 def __init__(self, thread_id, name):16 threading.Thread.__init__(self)17 self.thread_id = thread_id18 self.name = name19 def run(self):20 print 'Starting thread: ' + self.name21 self.reading()22 def reading(self):23 read, reading = True, ''24 while read:25 try:26 data = serial_port.read()27 str_data = str(data)28 # print str_data29 if str_data == '<':30 print 'Received: {}'.format(reading)31 if reading[:-2] == 'C_CDM_50':32 available_50c = int(reading[-2:],16)33 print 'READER: 1 coin $0.5, in tube: {}'.format(available_50c)34 self.deposited_50c = True35 elif reading[:-2] == 'C_CDM_52':36 available_1 = int(reading[-2:],16)37 print 'READER: 1 coin $1.0, in tube: {}'.format(available_1)38 self.deposited_1 = True39 elif reading[:-2] == 'C_CDM_53':40 available_2 = int(reading[-2:],16)41 print 'READER: 1 coin $2.0, in tube: {}'.format(available_2)42 self.deposited_2 = True43 elif reading[:-2] == 'C_CDM_54':44 available_5 = int(reading[-2:],16)45 print 'READER: 1 coin $5.0, in tube: {}'.format(available_5)46 self.deposited_5 = True47 elif reading[:-2] == 'C_CDM_45':48 print 'READER: 1 coin $10.0 deposited to hopper'49 self.deposited_10 = True50 elif reading[:6] == 'C_TUB_':51 self.tubes.full_tubes = reading[7:10]52 self.tubes.tube_50c = int(reading[11:12], 16)53 self.tubes.tube_1 = int(reading[13:16])54 self.tubes.tube_2 = int(reading[17:18], 16)55 self.tubes.tube_5 = int(reading[19:20], 16)56 #Check for hopper availability57 elif reading == 'H_STA_OK<':58 self.hopper_ok = True59 elif reading == 'H_STA_ERROR<':60 self.hopper_ok = False61 print 'READER: Full tubes: {}'.format(self.tubes.full_tubes)62 print 'READER: Coins on tube_50c: {}'.format(self.tubes.tube_50c)63 print 'READER: Coins on tube_1: {}'.format(self.tubes.tube_1)64 print 'READER: Coins on tube_2: {}'.format(self.tubes.tube_2)65 print 'READER: Coins on tube_5: {}'.format(self.tubes.tube_5)66 #Check for Hopper/Changer error responses67 elif reading[:-2] == 'H_DIS_TIMEOUT_':68 self.dispense_error = True69 elif reading[:-1] == 'C_DIS_ERROR':70 self.dispense_error = True71 reading = ''72 else:73 reading += data74 except serial.SerialException:75 print 'Reader: Error, exception'76class WriteThread(threading.Thread):77 def __init__(self, thread_id, name):78 threading.Thread.__init__(self)79 self.thread_id = thread_id80 self.name = name81 def write_cmd(self, cmd=Commands.VmcCommands()):82 try:83 if serial_port.isOpen():84 serial_port.write(cmd['request'])85 print 'Sent: {} cmd'.format(cmd['name'])86 except serial.SerialException:87 print 'Exception, data not written'88 time.sleep(0.5)89class Changer(threading.Thread):90 read_thread = ReadThread(1, 'Reader')91 write_thread = WriteThread(2, 'Writer')92 commands = Commands.VmcCommands()93 number_of_coins = 100 #dummy data for coins on hopper94 error_amount = 095 def update_data(self):96 self.write_thread.write_cmd(self.commands.C_TUBE_STATUS)97 av_50c = self.read_thread.tubes.tube_50c98 av_1 = self.read_thread.tubes.tube_199 av_2 = self.read_thread.tubes.tube_2100 av_5 = self.read_thread.tubes.tube_5101 coins_on_hopper = self.number_of_coins102 data = [{"currency_id":1,"currency_name":"50 centavos","currency_quantity":av_50c},103 {"currency_id":2,"currency_name":"1 peso","currency_quantity":av_1},104 {"currency_id": 3,"currency_name":"2 pesos","currency_quantity":av_2},105 {"currency_id":4,"currency_name":"5 pesos","currency_quantity":av_5},106 {"currency_id":5,"currency_name":"10 pesos","currency_quantity":coins_on_hopper}]107 json_data = json.dumps(data, default=lambda o: o.__dict__,108 sort_keys= True, indent=4)109 url = "http://localhost:8000/Currency/"110 print "Uploading to " + url111 headers = {"Authorization": "Basic YmVjYXJpbzpiZWNhcmlv",112 "Content-Type": "application/json"}113 for id in xrange(1,5):114 json_data = json.dumps(data[id], default=lambda o: o.__dict__,115 sort_keys= True, indent=4)116 url = "http://localhost:8000/Currency/" + str(id) + "/"117 request = requests.put(url, data = json_data, headers = headers)118 def start_changer(self):119 self.write_thread.write_cmd(self.commands.C_SETUP)120 self.write_thread.write_cmd(self.commands.C_TUBE_STATUS)121 self.write_thread.write_cmd(self.commands.C_EXPANSION)122 self.write_thread.write_cmd(self.commands.disable_tubes())123 self.update_data()124 def write_dispense(self, units=0, cents=0):125 """ Writes a Dispense command. Also evaluates which coins should be dispensed. """126 # Evaluate coins.127 res = int(units)128 quantity_10 = res / 10129 res %= 10130 quantity_5 = res / 5131 res %= 5132 quantity_2 = res / 2133 res %= 2134 quantity_1 = res / 1135 quantity_50c = int(cents) / 5136 # Dispense coins depending on the coins it should dispense.137 print "Dispensing " + str(quantity_10) + " coins of 10"138 print "Dispensing " + str(quantity_5) + " coins of 5"139 print "Dispensing " + str(quantity_2) + " coins of 2"140 print "Dispensing " + str(quantity_1) + " coins of 1"141 print "Dispensing " + str(quantity_50c) + " coins of 50c"142 self.write_thread.write_cmd(self.commands.enable_tubes())143 #Check tubes status144 self.write_thread.write_cmd(self.commands.C_TUBE_STATUS)145 #Get available coins on each tube146 av_50c = self.read_thread.tubes.tube_50c147 print "Available 50c = " + str(av_50c)148 av_1 = self.read_thread.tubes.tube_1149 print "Available 1 = " + str(av_1)150 av_2 = self.read_thread.tubes.tube_2151 print "Available 2 = " + str(av_2)152 av_5 = self.read_thread.tubes.tube_5153 print "Available 5 = " + str(av_5)154 #Chopper155 if quantity_10 != 0:156 print "Dispensing Hopper coins = " + str(quantity_10)157 self.write_thread.write_cmd(self.commands.check_hopper())158 self.write_thread.write_cmd(self.commands.hopper_dispense(quantity_10))159 self.number_of_coins -= quantity_10160 if quantity_5 != 0:161 print "Dispensing Changer coins = " + str(quantity_5)162 self.write_thread.write_cmd(self.commands.changer_dispense(4, quantity_5))163 if quantity_2 != 0:164 print "Dispensing Changer coins = " + str(quantity_2)165 self.write_thread.write_cmd(self.commands.changer_dispense(3, quantity_2))166 if quantity_1 != 0:167 print "Dispensing Changer coins = " + str(quantity_1)168 self.write_thread.write_cmd(self.commands.changer_dispense(2, quantity_1))169 if quantity_50c != 0:170 print "Dispensing Changer coins = " + str(quantity_50c)171 self.write_thread.write_cmd(self.commands.changer_dispense(0, quantity_50c))172 #Abonar173 """174 if quantity_10 != 0 and quantity_10 <= self.number_of_coins:175 print "Dispensing Hopper coins = " + str(quantity_10)176 self.write_thread.write_cmd(self.commands.check_hopper())177 self.write_thread.write_cmd(self.commands.hopper_dispense(quantity_10))178 self.number_of_coins -= quantity_10179 elif quantity_10 != 0 and quantity_10 > self.number_of_coins:180 self.error_amount += quantity_10*10181 self.read_thread.dispense_error = True182 print "10 " + str(self.error_amount)183 if quantity_5 != 0 and quantity_5 <= av_5:184 print "Dispensing Changer coins = " + str(quantity_5)185 self.write_thread.write_cmd(self.commands.changer_dispense(4, quantity_5))186 if self.read_thread.dispense_error:187 self.error_amount -= quantity_5*5188 elif quantity_5 != 0 and quantity_2 > av_5:189 self.error_amount += quantity_5*5190 self.read_thread.dispense_error = True191 print "5 " + str(self.error_amount)192 if quantity_2 != 0 and quantity_2 <= av_2:193 print "Dispensing Changer coins = " + str(quantity_2)194 self.write_thread.write_cmd(self.commands.changer_dispense(3, quantity_2))195 if self.read_thread.dispense_error:196 self.error_amount -= quantity_2*2197 elif quantity_2 != 0 and quantity_2 > av_2:198 self.error_amount += quantity_2*2199 self.read_thread.dispense_error = True200 print "2 " + str(self.error_amount)201 if quantity_1 != 0 and quantity_1 <= av_1:202 print "Dispensing Changer coins = " + str(quantity_1)203 self.write_thread.write_cmd(self.commands.changer_dispense(2, quantity_1))204 if self.read_thread.dispense_error:205 self.error_amount -= quantity_1206 elif quantity_1 != 0 and quantity_1 > av_1:207 self.error_amount += quantity_1208 self.read_thread.dispense_error = True209 print " 1 " +str(self.error_amount)210 if quantity_50c != 0 and quantity_50c <= av_50c:211 print "Dispensing Changer coins = " + str(quantity_50c)212 self.write_thread.write_cmd(self.commands.changer_dispense(0, quantity_50c))213 if self.read_thread.dispense_error:214 self.error_amount -= quantity_50c*.5215 elif quantity_50c != 0 and quantity_50c > av_50c:216 self.error_amount += quantity_50c*.5217 self.read_thread.dispense_error = True218 print "50c " + str(self.error_amount)219 """220 self.write_thread.write_cmd(self.commands.disable_tubes())221 def write_accept(self):222 print('ready to accept coins')223 if(not self.read_thread.deposited_50c) and (not self.read_thread.deposited_1) \224 and (not self.read_thread.deposited_2) and (not self.read_thread.deposited_5)\225 and (not self.read_thread.deposited_10):226 pass227 elif self.read_thread.deposited_50c:228 self.read_thread.deposited_50c = False229 return 0.5230 elif self.read_thread.deposited_1:231 self.read_thread.deposited_1 = False232 return 1.0233 elif self.read_thread.deposited_2:234 self.read_thread.deposited_2 = False235 return 2.0236 elif self.read_thread.deposited_5:237 self.read_thread.deposited_5 = False238 return 5.0239 elif self.read_thread.deposited_10:240 self.number_of_coins += 1241 self.read_thread.deposited_10 = False242 return 10.0243 else:244 return 0245 def socket_com(self, command=''):246 cmd, param1 = command.split()247 if cmd == 'DISPENSE':248 self.update_data()249 units, cents = param1.split('.')250 self.write_dispense(units, cents)251 return 'OK'252 elif cmd == 'ACCEPT':253 print 'command accept'254 return self.write_accept()255 else:256 pass257 def __init__(self, changer_port):258 super(Changer, self).__init__()259 global serial_port260 serial_port = changer_port261 def run(self):262 print 'Starting serial port'263 #self.start_serial() // Not used because changes to protocol264 print 'Starting threads'265 self.read_thread.start()...

Full Screen

Full Screen

STM32_Bootloader_Host.py

Source:STM32_Bootloader_Host.py Github

copy

Full Screen

1import time2import serial 3from serial import SerialException4import threading5# -----------------------------------------------------------------------------------6class readbusThread (threading.Thread):7 nbytes = 08 trig = 09 kill = 010 def __init__(self, connection):11 threading.Thread.__init__(self)12 self.connection = connection13 def run(self):14 while self.kill == 0:15 if self.trig == 1:16 self.rec_bytes = self.connection.read(self.nbytes)17 self.trig = 018# -----------------------------------------------------------------------------------19class stm32_bootloader_protocol :20 def __init__(self, port):21 self.ACK = 0x7922 self.NACK = 0x1F23 try:24 self.ser = serial.Serial(port)25 self.ser.baudrate = 11520026 self.ser.parity = serial.PARITY_EVEN27 self.ser.stopbits = serial.STOPBITS_ONE28 self.ser.timeout = 1029 self.read_thread = readbusThread(self.ser)30 self.read_thread.start()31 self.chip_id = [0]*1232 self.port = True33 except SerialException:34 self.port = False35 36 # Initialize Protocol:37 # Initializes the firmware upgrade protocol.38 def init_protocol(self):39 resp = False40 self.read_thread.nbytes = 141 self.read_thread.trig = 142 self.ser.write(bytes([0x7F])) # Start communication43 while self.read_thread.trig == 1:44 pass45 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:46 if self.read_thread.rec_bytes[0] == self.ACK:47 resp = True48 return resp49 # Get command:50 # The Get command allows the user to get the version of the bootloader and the supported51 # commands.52 def get(self):53 print('Get Command Resp: ')54 resp = False55 cmd = [0x00, 0xFF]56 self.read_thread.nbytes = 157 self.read_thread.trig = 158 self.ser.write(bytes(cmd)) 59 while self.read_thread.trig == 1:60 pass61 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:62 if self.read_thread.rec_bytes[0] == self.ACK:63 self.read_thread.trig = 164 while self.read_thread.trig == 1:65 pass66 self.read_thread.nbytes = self.read_thread.rec_bytes[0] + 267 self.read_thread.trig = 168 while self.read_thread.trig == 1:69 pass70 for i in range(len(self.read_thread.rec_bytes) -1 ):71 print('\tByte %i: 0X%x '%(i+3, self.read_thread.rec_bytes[i]))72 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:73 if self.read_thread.rec_bytes[-1] == self.ACK:74 resp = True75 print('')76 return resp77 # Get Version:78 # The Get Version & Read Protection Status command is used to get the bootloader version79 # and the read protection status.80 def get_ver(self):81 print('Get Version Command Resp: ')82 resp = False83 cmd = [0x01, 0xFE]84 self.read_thread.nbytes = 485 self.read_thread.trig = 186 self.ser.write(bytes(cmd)) 87 while self.read_thread.trig == 1:88 pass89 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:90 if self.read_thread.rec_bytes[0] == self.ACK:91 for i in range(1,len(self.read_thread.rec_bytes)):92 print('\tByte %i: 0X%x '%(i+1, self.read_thread.rec_bytes[i]))93 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:94 if self.read_thread.rec_bytes[-1] == self.ACK:95 resp = True96 print('')97 return resp98 # Get ID:99 # The Get ID command is used to get the version of the chip ID (identification). When the100 # bootloader receives the command, it transmits the product ID to the host.101 def get_id(self):102 print('Get ID Command Resp: ')103 resp = False104 cmd = [0x02, 0xFD]105 self.read_thread.nbytes = 3106 self.read_thread.trig = 1107 self.ser.write(bytes(cmd)) 108 while self.read_thread.trig == 1:109 pass110 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:111 if self.read_thread.rec_bytes[0] == self.ACK:112 print('\tByte 2: 0X%x '%self.read_thread.rec_bytes[2])113 self.read_thread.nbytes = self.read_thread.rec_bytes[2] + 1114 self.read_thread.trig = 1115 while self.read_thread.trig == 1:116 pass117 for i in range(len(self.read_thread.rec_bytes)):118 print('\tByte %i: 0X%x '%(i+3, self.read_thread.rec_bytes[i]))119 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:120 if self.read_thread.rec_bytes[-1] == self.ACK:121 resp = True122 print('')123 return resp124 # Get Device ID:125 # Reads the unique device id from the memory. Please note that base id might be different126 # for other STM32 microcontrollers.127 def get_device_id(self):128 base_address_h7 = [0x1F, 0xF1, 0xE8, 0x00] 129 self.read_mem(self.chip_id, base_address_h7, 12)130 return bytes(self.chip_id)131 # Read Memory:132 # The Read Memory command is used to read data from any valid memory address in RAM, 133 # Flash memory and the information block (system memory or option byte areas).134 def read_mem(self, buff, address_bytes, nbytes_to_read):135 resp = False136 cmd = [0x11, 0xEE]137 self.read_thread.nbytes = 1138 self.read_thread.trig = 1139 self.ser.write(bytes(cmd)) 140 while self.read_thread.trig == 1:141 pass142 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:143 if self.read_thread.rec_bytes[0] == self.ACK:144 address_check_sum = 0145 for i in range(4):146 address_check_sum = address_check_sum ^ address_bytes[i]147 self.read_thread.nbytes = 1148 self.read_thread.trig = 1149 self.ser.write(bytes(address_bytes + [address_check_sum]))150 while self.read_thread.trig == 1:151 pass152 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:153 if self.read_thread.rec_bytes[0] == self.ACK:154 self.read_thread.nbytes = nbytes_to_read + 1155 self.read_thread.trig = 1156 self.ser.write(bytes([nbytes_to_read-1] + [(nbytes_to_read-1) ^ 0xFF])) 157 while self.read_thread.trig == 1:158 pass159 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:160 if self.read_thread.rec_bytes[0] == self.ACK:161 # buff = self.read_thread.rec_bytes[1:]162 for i in range(nbytes_to_read):163 buff[i] = self.read_thread.rec_bytes[i+1]164 resp = True165 return resp166 # Go:167 # The Go command is used to execute the downloaded code or any other code by branching168 # to an address specified by the application. 169 def go(self, address):170 pass171 # Write Memory:172 # The Write Memory command is used to write data to any valid memory address173 # i.e. RAM, Flash memory, or option byte area.174 def write_mem(self, firmware):175 resp = False176 if len(firmware)%4 == 0:177 percent = 0178 nb_packets = len(firmware)//256179 if len(firmware)%256 != 0:180 nb_packets = nb_packets + 1181 cmd = [0x31, 0xCE]182 buff = [i for i in firmware]183 download_cplt = False184 address = 0x08000000185 while download_cplt == False:186 resp = False # Reset resp flag before each write process187 data_to_write = buff[:256]188 address_bytes = [(address>>24) & 0xFF, (address>>16) & 0xFF, (address>>8) & 0xFF, (address) & 0xFF]189 address_check_sum = 0190 for i in range(4):191 address_check_sum = address_check_sum ^ address_bytes[i]192 address = address + 256193 check_sum = len(data_to_write)-1194 for i in range(len(data_to_write)):195 check_sum = check_sum ^ data_to_write[i]196 self.read_thread.nbytes = 1197 self.read_thread.trig = 1198 self.ser.write(bytes(cmd)) 199 while self.read_thread.trig == 1:200 pass201 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:202 if self.read_thread.rec_bytes[0] == self.ACK:203 self.read_thread.trig = 1204 self.ser.write(bytes(address_bytes + [address_check_sum]))205 while self.read_thread.trig == 1:206 pass207 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:208 if self.read_thread.rec_bytes[0] == self.ACK:209 self.read_thread.nbytes = 1210 self.read_thread.trig = 1211 self.ser.write(bytes([len(data_to_write)-1] + data_to_write + [check_sum]))212 while self.read_thread.trig == 1:213 pass214 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:215 if self.read_thread.rec_bytes[0] == self.ACK:216 resp = True217 buff = buff[256:]218 percent = percent + 1219 print('Progress = %i%%'%((percent/nb_packets)*100))220 if len(buff) == 0 or resp == False:221 download_cplt = True222 return resp223 224 # Erase Memory:225 # The Erase Memory command allows the host to erase Flash memory pages.226 def erase_mem(self,number_of_pages, page_numbers):227 pass228 # Extended Full Erase:229 # The Extended Erase Memory command allows the host to erase Flash memory pages using230 # two bytes addressing mode.231 def extended_full_erase(self):232 resp = False233 cmd = [0x44, 0xBB]234 self.read_thread.nbytes = 1235 self.read_thread.trig = 1236 self.ser.write(bytes(cmd)) 237 while self.read_thread.trig == 1:238 pass239 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:240 if self.read_thread.rec_bytes[0] == self.ACK:241 self.read_thread.trig = 1242 self.ser.timeout = 120243 self.ser.write(bytes([0xFF, 0xFF, 0x00])) 244 while self.read_thread.trig == 1:245 pass246 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:247 if self.read_thread.rec_bytes[0] == self.ACK:248 resp = True249 self.ser.timeout = 4250 return resp251 252 # Write Protect:253 # The Write Protect command is used to enable the write protection for some or all Flash254 # memory sectors. 255 def write_protect(self):256 pass257 # Write Unprotect:258 # The Write Unprotect command is used to disable the write protection of all the Flash259 # memory sectors. 260 def write_unprotect(self):261 pass262 # Readout Protect:263 # The Readout Protect command is used to enable the Flash memory read protection. 264 def read_protect(self):265 resp = False266 cmd = [0x82, 0x7D]267 self.read_thread.nbytes = 2268 self.read_thread.trig = 1269 self.ser.write(bytes(cmd)) 270 while self.read_thread.trig == 1:271 pass272 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:273 if self.read_thread.rec_bytes[0] == self.ACK and self.read_thread.rec_bytes[1] == self.ACK:274 resp = True275 return resp276 # Readout Unprotect:277 # The Readout Unprotect command is used to disable the Flash memory read protection. 278 def read_unprotect(self):279 resp = False280 cmd = [0x92, 0x6D]281 self.read_thread.nbytes = 2282 self.read_thread.trig = 1283 self.ser.write(bytes(cmd)) 284 while self.read_thread.trig == 1:285 pass286 if len(self.read_thread.rec_bytes) == self.read_thread.nbytes:287 if self.read_thread.rec_bytes[0] == self.ACK and self.read_thread.rec_bytes[1] == self.ACK:288 resp = True289 return resp290 291 # Close:292 # Closes all the open files, threads and ports.293 def close(self):294 self.read_thread.kill = 1295 self.ser.close()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful