How to use just_return method in refurb

Best Python code snippet using refurb_python

orders_v3_01.py

Source:orders_v3_01.py Github

copy

Full Screen

1import format_random_v2_00 as fm2import re3import couriers_v3_01 as cour4import products_v1_02 as prdct5import pymysql6import os7import random8from dotenv import load_dotenv9import colorama10from colorama import Fore, Back, Style11colorama.init(autoreset=True)12 13## END IMPORTS14# Load environment variables from .env file15load_dotenv()16host = os.environ.get("mysql_host")17user = os.environ.get("mysql_user")18password = os.environ.get("mysql_pass")19database = os.environ.get("mysql_db")20# Establish a database connection21connection = pymysql.connect(22 host = host,23 user = user,24 password = password,25 database = database26)27def get_from_db(command):28 cursor = connection.cursor()29 cursor.execute(f"{command}") 30 myresult = cursor.fetchall()31 #.commit()32 #cursor.close()33 #connection.close()34 return(myresult)35def read_from_db():36 cursor = connection.cursor()37 cursor.execute("SELECT * FROM couriers") 38 myresult = cursor.fetchall()39 for x in myresult:40 print(x)41 #connection.commit()42 #cursor.close()43 #connection.close()44def add_to_db(command):45 cursor = connection.cursor()46 cursor.execute(f"{command}") 47 connection.commit()48 #cursor.close()49 #connection.close()50# CLASSES ##################################################################################################################################################################51# ORDERS CLASS #################################52class Orders:53 """ Orders class stores each order object in a 'global' list, customer name, address, phone + courier id + product ids + order status and (new) tot price """54 orders_list = [] # list to store all instances of each courier55 orders_id_cache = [] # for ensuring the same orders id is NEVER used again, this is of critical importance imo56 # init constructor57 def __init__(self, customer_name:str, customer_address:str, customer_phone:str, order_status:str, order_price:float, order_id:int, courier_id:int = None, products_ids:list = []): 58 """ Load or create new courier with name, phone numb, & id numb """59 60 def get_valid_order_id():61 """ if the id you want to use is in the couriers_id_cache then create a new one """62 current_value = int(max(self.orders_id_cache)) + 1 63 missing_elements = [ele for ele in range(max(self.orders_id_cache)+1) if ele not in self.orders_id_cache and ele != 0] 64 if missing_elements: # has items65 self.order_id = missing_elements[0]66 else: 67 self.order_id = current_value 68 self.orders_id_cache.append(self.order_id)69 #END IF70 #END INLINE FUNCTION71 # varaibles, set regardless of load or create (so all except id)72 self.customer_name = customer_name73 self.customer_address = customer_address74 self.customer_phone = customer_phone75 self.order_status = order_status76 self.order_price = order_price77 self.courier_id = courier_id78 self.products_ids = products_ids # leaving as blank for now as you will set it yourself, tho obvs if we have it then we load it79 # for differences between loading existing and creating new80 if order_id: # if id has a value, this means you have existing data to use when creating this new object81 self.order_id = order_id82 83 if self.order_id in self.orders_id_cache:84 # if there is a ID clash then update the id85 print(f"[ Potential ID Clash ({self.order_id}) Averted ]")86 get_valid_order_id()87 else:88 self.order_id = order_id89 self.orders_id_cache.append(self.order_id)90 self.orders_list.append(self)91 print(f"#{self.order_id} {self.customer_name} - {self.customer_phone} - {self.order_status} - {self.customer_address} {courier_id} Loaded") # TO ADD A BOOL PARAM FOR SHOWING THIS PRINT STATEMENT?92 else:93 # you are creating from scratch, so you need a new, dynamically created order_id94 if self.orders_id_cache: # has items95 get_valid_order_id()96 else:97 # if the cache has no items then its the first ite, so if its a brand new courier their number will be one (ooooo lucky you huh)98 self.order_id = 199 self.orders_id_cache.append(self.order_id)100 # append it to the "global" list and print back confirmation101 self.orders_list.append(self)102 print(f"Order #{self.order_id} Created\nCUSTOMER INFO\nName: {self.customer_name}, Phone: {self.customer_phone}, Address: {self.customer_address}\nORDER INFO") # TO ADD A BOOL PARAM FOR SHOWING THIS PRINT STATEMENT?103 print(f"Order Status: {self.order_status}, Assigned Courier: {courier_id}, Order Price: £{self.order_price}")104 print(f"Order Info For Restaurant : {self.products_ids}")105 #END IF106 #END INIT 107# PRINT FUNCTIONS108 # COURIERS - THIS NEEDS TO BE FIXED AGAIN BUT CBA109 # simple couriers print, code copied (could have just called the function knobhead!)110 def items_per_list_print_couriers(disp_size: int=22, rows = 3):111 #ipl = rows #da fuck112 usable_screen = int(disp_size) - 10113 ipl = usable_screen114 i_list = []115 # function116 # creates the initial lists with just ints of the order117 for g in range(ipl):118 x = g # if needs to be plus 1 then do here and also in x < len(main_co_list) + 1119 g_list = []120 for i in range(int(len(cour.Couriers.couriers_list)/ipl) + 1): # rows? needed (for X per line)121 if x < len(cour.Couriers.couriers_list):122 g_list.append(x)123 x += ipl124 i_list.append(g_list)125 x += 1126 # END FOR127 # END FOR 128 # filling in the above for loop with the data you want129 for short_list in i_list: 130 print_string = ""131 for index in short_list:132 cr = cour.Couriers.couriers_list[index] 133 live_ords = get_live_orders_for_courier(index)134 current_string = (f"{index} {cour.Couriers.couriers_list[index].name} {cr.location} {live_ords}")135 spaces = 36 - (len(current_string))136 spaces_string = ""137 if int(index) + 1 == 10:138 spaces -= 1139 for x in range(spaces):140 spaces_string += " "141 print_string += (f"[ {int(index) + 1} ] {cr.name} {spaces_string} ({live_ords}) - {cr.location} ")142 print(print_string)143 # ORDERS144 def print_orders(self):145 fm.format_display(then_text = " PRINT ORDERS ".center(60, '-'))146 # if the list is empty prompt user to load the default list (otherwise what you printing huh bud?)147 if len(self.orders_list) < 1:148 print("No Orders List - WTF?!") # you are prompted in here, if you say no then it returns an empty list149 fm.format_display()150 # if the orders list isn't empty do the print stuff below (you had a chance to just load a default one so this was ur call)151 if len(self.orders_list) >= 1: # yes this is if not elif intentionally, we want to check twice if the orders list is empty at the start152 print("")153 #fm.format_display()154 print("Printing Default Orders List [ {} Total Orders ]".format(len(self.orders_list)))155 fm.print_dashes()156 if len(self.orders_list) >= 3:157 print("[ 1 ] = Search For Order Number")158 fm.print_dashes()159 print("[ 2 ] = Scroll Orders List")160 fm.print_dashes()161 one_or_two = input("Your Selection : ")162 fm.print_dashes()163 # SEARCH - PROBS (LIKE PROBLY DEFO) NEEDS GET ATTR UPDATE WHICH IS FINE (not tested at all, expecting will error like fuck)164 if one_or_two == "1": 165 fm.format_display(end_with_dashes = True)166 wanted_order = int(input("Enter An Order Number [#1 - #{}] : ".format(len(self.orders_list))))167 fm.format_display() 168 print(" ORDER #{} FOR {} ".format(wanted_order, self.orders_list[wanted_order - 1]["customer_name"]).center(60, '-'))169 print("")170 print(" Customer Name = {}".format(self.orders_list[wanted_order - 1]["customer_name"]))171 print(" Customer Address = {}".format(self.orders_list[wanted_order - 1]["customer_address"]))172 print(" Customer Phone Number = {}".format(self.orders_list[wanted_order - 1]["customer_phone"]))173 print(" Courier Number = {}".format(self.orders_list[wanted_order - 1]["courier_number"]))174 print(" Order Status = {}".format(self.orders_list[wanted_order - 1]["order_status"]))175 print(" Order Price = {}".format(self.orders_list[wanted_order - 1]["order_price"]))176 print(" For Restaurant = {}".format(self.orders_list[wanted_order - 1]["products_ids"]))177 print("")178 faux_input = input("Press Enter To Continue : ")179 elif one_or_two == "2": # SCROLL180 # prints by scrolling through the items181 # could always add skip 5 if order list is say over 20182 for order_number, order in enumerate(self.orders_list): 183 fm.format_display() 184 print(" ORDER #{} ".format(order_number + 1).center(60, '-'))185 print(f" ORDER #{getattr(order,'order_id')} FOR {getattr(order,'customer_name')}")186 print("")187 print(f" Customer Name = {getattr(order,'customer_name')}")188 print(f" Customer Address = {getattr(order,'customer_address')}")189 print(f" Customer Phone Number = {getattr(order,'customer_phone')}") 190 print(f" Courier ID = {getattr(order,'courier_id')}")191 print(f" Order Status = {getattr(order,'order_status')}")192 print("")193 #print_dashes(30)194 fm.print_dashes(59, "not spaced")195 print("")196 faux_input = "" # storing the variable so it can be checked for an escape key (m)197 if order_number + 1 < len(self.orders_list):198 print("")199 print("")200 print("(enter m for menu)")201 faux_input = input("Press Enter To Print Next Order ({}/{}): ".format(order_number + 1, len(self.orders_list)))202 else:203 fm.print_dashes()204 print("NO MORE ORDERS ({}/{})".format(order_number + 1, len(self.orders_list)))205 fm.print_dashes()206 faux_input = input("Press Enter To Continue : ")207 if faux_input == "M" or faux_input == "m":208 # if the faux input is M then break (don't keep printing the loop)209 break210 else:211 # it found no list checking first, it asked you to load one, you said no, so it aint guna do nutting duh!212 fm.print_dashes()213 print("Nothing To Print, You Don't Wanna Load...")214 fm.print_dashes()215 faux_input = input("Press Enter To Continue : ")216''' FOR THE GET_ATTR WAY217 for i, courier in enumerate(cour.Couriers.couriers_list):218 x = getattr(courier, "name")219 z = getattr(courier, "location")220 # BRUHHH THEN CAN ZIP WITH ORDERS INFO NAH DONT NEED TO ZIP JUST CHECK AGAINST AND GET THE INFO HERE, WELL HAVE A FUNCTION CALL TO DO THAT NOT IN LOOP BUT BOSH!221 print(f"[ {i} ] - {x}, Location - {z}")222'''223# print(f"Order #{self.order_id} Created\nCUSTOMER INFO\nName: {self.customer_name}, Phone: {self.customer_phone}, Address: {self.customer_address}\nORDER INFO") # TO ADD A BOOL PARAM FOR SHOWING THIS PRINT STATEMENT?224# print(f"Order Status: {self.order_status}, Assigned Courier: {courier_id}, Order Price: £{self.order_price}")225# print(f"Order Info For Restaurant : {self.products_ids}")226## MAIN PROGRAM ############################################################################################################################################################227## NEW DB PRINT STUFF ######################################################################################################################################################228def make_address_readable(an_address:str):229 # should add if no comma just clip it at X chars (like 15 or sumnt)230 if an_address:231 ndx = an_address.find(",")232 if ndx != -1:233 new_addy = an_address[:ndx]234 return(new_addy)235 elif len(an_address) > 18:236 return(an_address[:18])237def get_couriers_name_from_id(ndx:int):238 query = f"SELECT c.name FROM couriers c WHERE c.courier_db_id = {ndx}"239 result = str(get_from_db(query))240 ndx = result.find("'",3)241 final_result = result[3:ndx]242 return(final_result.strip())243def db_print_orders(disp_size: int=22, rows = 3):244 usable_screen = int(disp_size) - 15245 ipl = usable_screen246 length_of_couriers = get_from_db(f'SELECT * FROM orders')247 length_of_couriers = len(length_of_couriers)248 total_pages = int(length_of_couriers / ipl)249 if (length_of_couriers % ipl) != 0:250 total_pages += 1251 final_page = len(range(total_pages))252 print(f"FINAL PAGE : {final_page}")253 print(f"Total Pages = {total_pages}")254 pages_display = [f"[ {x+1} ]" for x in range(total_pages)]255 pages_display = " - ".join(pages_display)256 fm.format_display(disp_size)257 want_more_print = True258 current_page = 1259 dont_print = False260 query = f'SELECT * FROM orders LIMIT {ipl}'261 #print(f"display size = {disp_size}")262 #print(f"display size = {rows}")263 def create_spaces(string_to_space:str = "Some Default String", space_count:int = 30, just_return:int = None):264 if just_return:265 return_spaces = ""266 for _ in range(just_return):267 return_spaces += " "268 return(return_spaces)269 else:270 to_space = len(string_to_space)271 final_space_count = space_count - to_space 272 string_of_spaces = ""273 for x in range(final_space_count):274 string_of_spaces += " "275 return(string_of_spaces)276 while want_more_print: 277 #query = query278 result = get_from_db(query)279 if dont_print:280 pass281 else:282 print(f"{Style.BRIGHT}Customer {create_spaces(just_return=22)} Order Status{create_spaces(just_return=24)} Order Delivery{create_spaces(just_return=35)} Customer Contact{create_spaces(just_return=9)}For Restaurant (product number, quantity)")283 print(f"{fm.print_dashes(return_it=True)} {create_spaces(just_return=8)} {fm.print_dashes(return_it=True)} {create_spaces(just_return=13)} {fm.print_dashes(return_it=True)} {create_spaces(just_return=26)} {fm.print_dashes(amount_of_dashes=7, return_it=True)}{create_spaces(just_return=8)} {fm.print_dashes(return_it=True)}")284 for order_info in result:285 x = f"[ {order_info[0]} ] {order_info[1]} {order_info[3]} {order_info[4]} {make_address_readable(order_info[2])} {order_info[5]} {get_couriers_name_from_id(order_info[7])}" #{order_info[8]}286 display1 = f"[ {order_info[0]} ] {order_info[1]}"287 display2 = f"- £{order_info[5]} : {order_info[4]}" 288 display3 = f"- {get_couriers_name_from_id(order_info[7])} - @ {make_address_readable(order_info[2])}" 289 display4 = f"@ {order_info[3]}" 290 spaces1 = create_spaces(display1, 30)291 spaces2 = create_spaces(display2, 30)292 spaces3 = create_spaces(display3, 48)293 spaces4 = create_spaces(display4, 25)294 #current_str = len(x)295 #spaces = 90 - current_str296 #spaces_string = ""297 #for x in range(spaces):298 # spaces_string += " "299 #print(courier)300 301 if dont_print:302 pass303 #print("You've Entered The Wrong Page Number")304 else:305 print(f"{Fore.BLUE}[ {Fore.CYAN}{order_info[0]} {Fore.BLUE}] {Fore.RESET}{order_info[1]} {spaces1} £{order_info[5]} [PAID] - {order_info[4]} {spaces2} {get_couriers_name_from_id(order_info[7])} -> to {make_address_readable(order_info[2])} {spaces3} {order_info[3]} {spaces4} {order_info[8]}") # {order_info[8]}306 if dont_print:307 pass308 else:309 print(f"{fm.print_dashes(return_it=True)} {create_spaces(just_return=8)} {fm.print_dashes(return_it=True)} {create_spaces(just_return=13)} {fm.print_dashes(return_it=True)} {create_spaces(just_return=26)} {fm.print_dashes(amount_of_dashes=7, return_it=True)}{create_spaces(just_return=8)} {fm.print_dashes(return_it=True)}")310 print("") 311 #print(pages_display)312 page_numbs = [f"{x+1}" for x in range(total_pages)]313 #print(f"current_page = {current_page}") 314 #print(f"page_numbs = {page_numbs}") 315 316 highlight_page = lambda h : f"{Fore.BLUE}[[ {Fore.CYAN}{h}{Fore.BLUE} ]]" if h == str(current_page) else f"[ {h} ]" # language is so mad wtf 317 print(*[highlight_page(p) for p in page_numbs])318 print("") 319 paginate_action = (input("Enter A Page Number Or 0 To Quit : "))320 #print(current_page)321 #print(final_page)322 #print(paginate_action)323 fm.print_dashes()324 g = (int(paginate_action))325 if g > final_page:326 fm.format_display(disp_size)327 cour.return_one_line_art()328 current_page = int(paginate_action)329 dont_print = True330 elif paginate_action != "0":331 fm.format_display(disp_size)332 current_page = int(paginate_action)333 query = f'SELECT * FROM orders WHERE order_id > {(current_page - 1) * ipl} LIMIT {ipl}'334 dont_print = False335 else:336 #fm.format_display(disp_size)337 print("Ok Bye")338 want_more_print = False339############################################################################################################################################340def db_print_orders_for_every_courier(disp_size: int=22, rows = 3):341 usable_screen = int(disp_size) - 10342 ipl = usable_screen343 length_of_couriers = get_from_db(f'SELECT o.customer_name, o.customer_address, o.order_price, o.order_id, o.order_status, c.name, c.courier_db_id FROM orders o RIGHT JOIN couriers c ON o.courier_id = c.courier_db_id ORDER BY c.courier_db_id')344 length_of_couriers = len(length_of_couriers)345 total_pages = int(length_of_couriers / ipl)346 if (length_of_couriers % ipl) != 0:347 total_pages += 1348 final_page = len(range(total_pages))349 print(f"FINAL PAGE : {final_page}")350 print(f"Total Pages = {total_pages}")351 pages_display = [f"[ {x+1} ]" for x in range(total_pages)]352 pages_display = " - ".join(pages_display)353 fm.format_display(disp_size)354 want_more_print = True355 current_page = 1356 dont_print = False357 query = f'SELECT o.customer_name, o.customer_address, o.order_price, o.order_id, o.order_status, c.name, c.courier_db_id FROM orders o RIGHT JOIN couriers c ON o.courier_id = c.courier_db_id ORDER BY c.courier_db_id LIMIT {ipl}'358 #print(f"display size = {disp_size}")359 #print(f"display size = {rows}")360 def create_spaces(string_to_space:str = "Some Default String", space_count:int = 30, just_return:int = None):361 if just_return:362 return_spaces = ""363 for _ in range(just_return):364 return_spaces += " "365 return(return_spaces)366 else:367 to_space = len(string_to_space)368 final_space_count = space_count - to_space 369 string_of_spaces = ""370 for x in range(final_space_count):371 string_of_spaces += " "372 return(string_of_spaces)373 while want_more_print: 374 #query = query375 result = get_from_db(query)376 if dont_print:377 pass378 else:379 print(f"Courier {create_spaces(just_return=23)} Order Status{create_spaces(just_return=7)} Order Delivery{create_spaces(just_return=37)} Customer Contact")380 print(f"{fm.print_dashes(return_it=True)} {create_spaces(just_return=8)} {fm.print_dashes(amount_of_dashes=5, return_it=True)} {create_spaces(just_return=6)} {fm.print_dashes(return_it=True)} {create_spaces(just_return=28)} {fm.print_dashes(amount_of_dashes=7, return_it=True)}")381 for order_info in result:382 def none_order_status(is_it_none:str):383 if is_it_none is None:384 return("No Bueno")385 else:386 return(is_it_none)387 def dont_print_none(is_it_none:str, opt=4):388 if is_it_none is None and opt == 1:389 return("X")390 elif is_it_none is None and opt == 2:391 return("Nobody")392 elif is_it_none is None and opt == 3:393 return("Nowhere")394 elif is_it_none is None and opt == 5:395 return("zilch")396 elif is_it_none is None:397 return(" ")398 else:399 return(is_it_none)400 401 display1 = f"[ {order_info[5]} ] {order_info[6]}"402 display2 = f"{none_order_status(order_info[4])}"403 display3 = f"{dont_print_none(order_info[3], 1)} {dont_print_none(order_info[0], 2)} {dont_print_none(make_address_readable(order_info[1]), 3)}" 404 spaces1 = create_spaces(display1, 32)405 spaces2 = create_spaces(display2, 18)406 spaces3 = create_spaces(display3, 35)407 if dont_print:408 pass409 else:410 print(f"#{order_info[6]} {order_info[5]} {spaces1} {none_order_status(order_info[4])} {spaces2} Order #{dont_print_none(order_info[3], 1)} For {dont_print_none(order_info[0], 2)} - @ {dont_print_none(make_address_readable(order_info[1]), 3)} {spaces3} £{dont_print_none(order_info[2], 5)}") 411 if dont_print:412 pass413 else:414 print(f"{fm.print_dashes(return_it=True)} {create_spaces(just_return=8)} {fm.print_dashes(amount_of_dashes=5, return_it=True)} {create_spaces(just_return=6)} {fm.print_dashes(return_it=True)} {create_spaces(just_return=28)} {fm.print_dashes(amount_of_dashes=7, return_it=True)}")415 print("") 416 #print(pages_display)417 page_numbs = [f"{x+1}" for x in range(total_pages)]418 #print(f"current_page = {current_page}") 419 #print(f"page_numbs = {page_numbs}") 420 421 highlight_page = lambda h : f"[[ {h} ]]" if h == str(current_page) else f"[ {h} ]" # language is so mad wtf 422 print(*[highlight_page(p) for p in page_numbs])423 print("") 424 paginate_action = (input("Enter A Page Number Or 0 To Quit : "))425 #print(current_page)426 #print(final_page)427 #print(paginate_action)428 fm.print_dashes()429 g = (int(paginate_action))430 if g > final_page:431 fm.format_display(disp_size)432 cour.return_one_line_art()433 current_page = int(paginate_action)434 dont_print = True435 elif paginate_action != "0":436 fm.format_display(disp_size)437 current_page = int(paginate_action)438 def if_minus(x):439 if x < 0:440 return(0)441 else:442 #print(f"IM RETURNING X = {x}")443 return(x)444 if paginate_action == "1":445 query = f'SELECT o.customer_name, o.customer_address, o.order_price, o.order_id, o.order_status, c.name, c.courier_db_id FROM orders o RIGHT JOIN couriers c ON o.courier_id = c.courier_db_id ORDER BY c.courier_db_id LIMIT {ipl}'446 elif paginate_action == "2":447 query = f'SELECT o.customer_name, o.customer_address, o.order_price, o.order_id, o.order_status, c.name, c.courier_db_id FROM orders o RIGHT JOIN couriers c ON o.courier_id = c.courier_db_id ORDER BY c.courier_db_id LIMIT {ipl}, {ipl}'448 else:449 query = f'SELECT o.customer_name, o.customer_address, o.order_price, o.order_id, o.order_status, c.name, c.courier_db_id FROM orders o RIGHT JOIN couriers c ON o.courier_id = c.courier_db_id WHERE c.courier_db_id > 2 ORDER BY c.courier_db_id LIMIT {if_minus((current_page-2)*ipl-1)}, {ipl}'450 dont_print = False451 else:452 #fm.format_display(disp_size)453 print("Ok Bye")454 want_more_print = False455############################################################################################################################################456def get_total_orders_for_courier(chosen_courier:int):457 chosen_courier += 1458 query = f"SELECT o.order_status FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id WHERE c.courier_db_id = {chosen_courier} ORDER BY c.courier_db_id"459 result = get_from_db(query)460 return(len(result))461def get_completed_orders_for_courier(chosen_courier:int):462 chosen_courier += 1463 query = f"SELECT o.order_status FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id WHERE c.courier_db_id = {chosen_courier} AND (o.order_status = 3 OR o.order_status = 4 OR o.order_status = 5) ORDER BY c.courier_db_id"464 result = get_from_db(query)465 return(len(result))466def db_print_courier_for_search(disp_size: int=22, rows = 3):467 usable_screen = int(disp_size) - 10468 ipl = usable_screen469 length_of_couriers = get_from_db(f'SELECT c.courier_db_id, c.name, c.location, c.phone_number FROM couriers c')470 length_of_couriers = len(length_of_couriers)471 total_pages = int(length_of_couriers / ipl)472 if (length_of_couriers % ipl) != 0:473 total_pages += 1474 final_page = len(range(total_pages))475 print(f"FINAL PAGE : {final_page}")476 print(f"Total Pages = {total_pages}")477 pages_display = [f"[ {x+1} ]" for x in range(total_pages)]478 pages_display = " - ".join(pages_display)479 fm.format_display(disp_size)480 want_more_print = True481 current_page = 1482 dont_print = False483 query = f'SELECT c.courier_db_id, c.name, c.location, c.phone_number FROM couriers c LIMIT {ipl}'484 #print(f"display size = {disp_size}")485 #print(f"display size = {rows}")486 def create_spaces(string_to_space:str = "Some Default String", space_count:int = 30, just_return:int = None):487 if just_return:488 return_spaces = ""489 for _ in range(just_return):490 return_spaces += " "491 return(return_spaces)492 else:493 to_space = len(string_to_space)494 final_space_count = space_count - to_space 495 string_of_spaces = ""496 for x in range(final_space_count):497 string_of_spaces += " "498 return(string_of_spaces)499 while want_more_print: 500 #query = query501 result = get_from_db(query)502 if dont_print:503 pass504 else:505 print(f"Courier [ID/Name]{create_spaces(just_return=14)} Location{create_spaces(just_return=11)} Contact Number{create_spaces(just_return=6)}Active Orders{create_spaces(just_return=4)}All Orders {create_spaces(just_return=5)} Finalised Orders")506 print(f"{fm.print_dashes(return_it=True)} {create_spaces(just_return=8)} {fm.print_dashes(amount_of_dashes=5, return_it=True)} {create_spaces(just_return=6)} {fm.print_dashes(amount_of_dashes=5, return_it=True)} {create_spaces(just_return=6)} {fm.print_dashes(amount_of_dashes=4, return_it=True)} {create_spaces(just_return=5)} {fm.print_dashes(amount_of_dashes=4, return_it=True)} {create_spaces(just_return=5)} {fm.print_dashes(amount_of_dashes=6, return_it=True)}")507 for order_info in result:508 # where dont print none was btw509 510 live_ords = get_live_orders_for_courier(order_info[0] - 1)511 total_ords = get_total_orders_for_courier(order_info[0] - 1)512 comp_ords = get_completed_orders_for_courier(order_info[0] - 1)513 display1 = f"#{order_info[0]} - {order_info[1]}"514 display2 = f"{order_info[2]}"515 display3 = f"{order_info[3]}"516 display4 = f"{live_ords} Live"517 display5 = f"{total_ords} Total"518 spaces1 = create_spaces(display1, 29)519 spaces2 = create_spaces(display2, 18)520 spaces3 = create_spaces(display3, 17)521 spaces4 = create_spaces(display4, 13)522 spaces5 = create_spaces(display5, 13)523 if dont_print:524 pass525 else:526 print(f"{order_info[0]}. - {order_info[1]} {spaces1} {order_info[2]} {spaces2} 0{order_info[3]} {spaces3} {live_ords} - Live {spaces4} {total_ords} - Total {spaces5} {comp_ords} - Done") 527 if dont_print:528 pass529 else:530 print(f"{fm.print_dashes(return_it=True)} {create_spaces(just_return=8)} {fm.print_dashes(amount_of_dashes=5, return_it=True)} {create_spaces(just_return=6)} {fm.print_dashes(amount_of_dashes=5, return_it=True)} {create_spaces(just_return=6)} {fm.print_dashes(amount_of_dashes=4, return_it=True)} {create_spaces(just_return=5)} {fm.print_dashes(amount_of_dashes=4, return_it=True)} {create_spaces(just_return=5)} {fm.print_dashes(amount_of_dashes=6, return_it=True)}")531 print("") 532 # print(pages_display)533 page_numbs = [f"{x+1}" for x in range(total_pages)]534 # print(f"current_page = {current_page}") 535 # print(f"page_numbs = {page_numbs}") 536 537 highlight_page = lambda h : f"[[ {h} ]]" if h == str(current_page) else f"[ {h} ]" # language is so mad wtf 538 print(*[highlight_page(p) for p in page_numbs])539 print("") 540 paginate_action = (input("Select Page Number Then Use 0 To Search By Courier : "))541 # print(current_page)542 # print(final_page)543 # print(paginate_action)544 fm.print_dashes()545 g = (int(paginate_action))546 if g > final_page:547 fm.format_display(disp_size)548 cour.return_one_line_art()549 current_page = int(paginate_action)550 dont_print = True551 elif paginate_action != "0":552 fm.format_display(disp_size)553 current_page = int(paginate_action)554 query = f'SELECT c.courier_db_id, c.name, c.location, c.phone_number FROM couriers c LIMIT {(current_page-1) * ipl}, {ipl}'555 dont_print = False556 else: 557 # could totally print again on 'exit' for courier number select but meh is fine for now558 fm.format_display(disp_size)559 query = f'SELECT c.courier_db_id, c.name, c.location, c.phone_number FROM couriers c LIMIT {(current_page-1) * ipl}, {ipl}'560 result = get_from_db(query)561 print(f"Courier [ID/Name]{create_spaces(just_return=14)} Location{create_spaces(just_return=11)} Contact Number{create_spaces(just_return=6)}Active Orders{create_spaces(just_return=4)}All Orders {create_spaces(just_return=5)} Finalised Orders")562 print(f"{fm.print_dashes(return_it=True)} {create_spaces(just_return=8)} {fm.print_dashes(amount_of_dashes=5, return_it=True)} {create_spaces(just_return=6)} {fm.print_dashes(amount_of_dashes=5, return_it=True)} {create_spaces(just_return=6)} {fm.print_dashes(amount_of_dashes=4, return_it=True)} {create_spaces(just_return=5)} {fm.print_dashes(amount_of_dashes=4, return_it=True)} {create_spaces(just_return=5)} {fm.print_dashes(amount_of_dashes=6, return_it=True)}")563 for order_info in result:564 live_ords = get_live_orders_for_courier(order_info[0] - 1)565 total_ords = get_total_orders_for_courier(order_info[0] - 1)566 comp_ords = get_completed_orders_for_courier(order_info[0] - 1)567 display1 = f"#{order_info[0]} - {order_info[1]}"568 display2 = f"{order_info[2]}"569 display3 = f"{order_info[3]}"570 display4 = f"{live_ords} Live"571 display5 = f"{total_ords} Total"572 spaces1 = create_spaces(display1, 29)573 spaces2 = create_spaces(display2, 18)574 spaces3 = create_spaces(display3, 17)575 spaces4 = create_spaces(display4, 13)576 spaces5 = create_spaces(display5, 13)577 print(f"{order_info[0]}. - {order_info[1]} {spaces1} {order_info[2]} {spaces2} 0{order_info[3]} {spaces3} {live_ords} - Live {spaces4} {total_ords} - Total {spaces5} {comp_ords} - Done") 578 print(f"{fm.print_dashes(return_it=True)} {create_spaces(just_return=8)} {fm.print_dashes(amount_of_dashes=5, return_it=True)} {create_spaces(just_return=6)} {fm.print_dashes(amount_of_dashes=5, return_it=True)} {create_spaces(just_return=6)} {fm.print_dashes(amount_of_dashes=4, return_it=True)} {create_spaces(just_return=5)} {fm.print_dashes(amount_of_dashes=4, return_it=True)} {create_spaces(just_return=5)} {fm.print_dashes(amount_of_dashes=6, return_it=True)}")579 print("")580 want_more_print = False581 582def db_join_by_chosen_courier(disp_size=22, rows=3): # here right join will give you all the couriers even if they dont have an order which is kinda kewl583 Orders.items_per_list_print_couriers(disp_size, rows)584 fm.print_dashes()585 chosen_courier = int(input("Enter Courier Number To Search Their Orders : "))586 # confirm its in range here tbf587 query = f"SELECT o.customer_name, o.order_price, o.order_id, o.order_status, c.courier_db_id FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id WHERE courier_db_id = {chosen_courier} ORDER BY c.courier_db_id"588 result = get_from_db(query)589 for a_result in result:590 print(a_result)591 fm.fake_input()592# NEW ################################################################################################################593def db_join_by_chosen_courier_then_status(disp_size=22, rows=3): # then guess could do if init display list too big just show live, then else show something else... (or just paginate from the start duh!)594 fm.format_display(disp_size)595 print(f"Some Title Info\n{fm.print_dashes(return_it=True)}\n")596 #Orders.items_per_list_print_couriers(disp_size, rows)597 #598 #599 db_print_courier_for_search(disp_size, rows)600 #601 #602 chosen_courier = int(input("Enter Courier Number To Search Their Orders : "))603 # confirm its in range here tbf604 fm.print_dashes()605 query = f"SELECT o.customer_name, o.order_price, o.order_id, o.order_status, c.courier_db_id FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id WHERE c.courier_db_id = {chosen_courier} ORDER BY c.courier_db_id"606 go_again = True607 chosen_status = 0608 max_length_of_all_couriers_orders = len(get_from_db(query)) # if this is too long change the query and obvs ensure it links up properly throughout that page is changed n shit609 def create_spaces(string_to_space:str = "DFLT", space_count:int = 30, just_return:int = None):610 if just_return:611 return_spaces = ""612 for _ in range(just_return):613 return_spaces += " "614 return(return_spaces)615 else:616 to_space = len(string_to_space)617 final_space_count = space_count - to_space 618 string_of_spaces = ""619 for x in range(final_space_count):620 string_of_spaces += " "621 return(string_of_spaces)622 while go_again:623 fm.format_display(disp_size)624 if chosen_status == 9:625 query = f"SELECT o.customer_name, o.order_price, o.order_id, o.order_status, c.courier_db_id FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id WHERE c.courier_db_id = {chosen_courier} ORDER BY c.courier_db_id"626 elif chosen_status != 0:627 query = f"SELECT o.customer_name, o.order_price, o.order_id, o.order_status, c.courier_db_id FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id WHERE c.courier_db_id = {chosen_courier} AND o.order_status = {chosen_status} ORDER BY c.courier_db_id"628 #fm.format_display(disp_size)629 result = get_from_db(query)630 def print_full_key_with_caps_for_current():631 status_dict = {1:"preparing", 2:"delivering", 3:"delivered", 4:"recieved", 5:"cancelled", 6:"scheduling", 9:"all"}632 print(f"Order Status Key\n{fm.print_dashes(return_it=True)}")633 if_current_make_bold = lambda x,y : f" {x.upper()} <<" if y == chosen_status or y == 0 else x634 for num, status in status_dict.items():635 print(f"[ {num} ] {if_current_make_bold(status.title(), num)}")636 def status_num_to_word(statnum:int):637 status_dict = {1:"preparing", 2:"delivering", 3:"delivered", 4:"recieved", 5:"cancelled", 6:"scheduling", 0:"all", 9:"all"}638 rv = status_dict.get(statnum)639 return(rv.title())640 641 #print(chosen_status)642 print_full_key_with_caps_for_current()643 #print(f"Order Status Key\n{fm.print_dashes(return_it=True)}\n[ 1 ] Preparing\n[ 2 ] Out For Delivery\n[ 3 ] Delivered\n[ 4 ] Recieved\n[ 5 ] Cancelled\n[ 6 ] Scheduling\n[ 9 ] All")644 print("")645 print(f"{status_num_to_word(chosen_status)} Orders") 646 print(f"Courier #{chosen_courier} - {get_couriers_name_from_id(chosen_courier)}") 647 print("") 648 fm.print_dashes()649 for order_info in result:650 display1 = f"#{order_info[2]} - {order_info[0]}"651 display2 = f"{order_info[3]}"652 display3 = f"{order_info[1]}"653 spaces1 = create_spaces(display1, 29)654 spaces2 = create_spaces(display2, 18)655 spaces3 = create_spaces(display3, 17)656 print(f"{order_info[2]} - {order_info[0]} {spaces1} {order_info[3]} {spaces2} £{order_info[1]} {spaces3}")657 if len(result) <= max_length_of_all_couriers_orders - 1:658 the_diff_to_add_lines = (max_length_of_all_couriers_orders - len(result))659 if len(result) == 0:660 the_diff_to_add_lines -= 1661 for _ in range(the_diff_to_add_lines):662 print("-")663 #x = ["\n" for _ in range(the_diff_to_add_lines)] probs needs minus 1 too if you wanna do it 664 #print(*x)665 666 if len(result) == 0:667 print(return_one_line_art())668 fm.print_dashes()669 print("")670 #print("If You Would Like To See Orders For A Specific Order Status Please Enter The Order Code")671 chosen_status = int(input("Enter A Status Number To Filter By It, OR 9 To View All, OR 0 To Exit : ")) # will type error if not an int so do convert outside and then can work it with below if statement672 if chosen_status != 1 and chosen_status != 2 and chosen_status != 3 and chosen_status != 4 and chosen_status != 5 and chosen_status != 6 and chosen_status != 9 and chosen_status != 0: #make it 9?673 chosen_status = 9674 # confirm its in range here?675 if (chosen_status != 0):676 print("And Go Again") # lol remove this knobhead677 else:678 print("Well End Function Then")679 go_again = False680 break681 682#####################################################################################################################################683def db_print_search_by_status(disp_size=22, rows=3): # then guess could do if init display list too big just show live, then else show something else... (or just paginate from the start duh!)684 fm.format_display(disp_size)685 print(f"Some Title Info\n{fm.print_dashes(return_it=True)}\n")686 # chosen_courier = int(input("Enter Courier Number To Search Their Orders : "))687 # confirm its in range here tbf688 fm.print_dashes()689 query = f"SELECT o.customer_name, o.order_price, o.order_id, o.order_status, o.customer_address, c.courier_db_id FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id ORDER BY c.courier_db_id"690 go_again = True691 chosen_status = 0692 max_length_of_all_couriers_orders = len(get_from_db(query)) # if this is too long change the query and obvs ensure it links up properly throughout that page is changed n shit693 def create_spaces(string_to_space:str = "DFLT", space_count:int = 30, just_return:int = None):694 if just_return:695 return_spaces = ""696 for _ in range(just_return):697 return_spaces += " "698 return(return_spaces)699 else:700 to_space = len(string_to_space)701 final_space_count = space_count - to_space 702 string_of_spaces = ""703 for x in range(final_space_count):704 string_of_spaces += " "705 return(string_of_spaces)706 while go_again:707 fm.format_display(disp_size)708 if chosen_status == 9:709 query = f"SELECT o.customer_name, o.order_price, o.order_id, o.order_status, o.customer_address, c.courier_db_id FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id ORDER BY c.courier_db_id"710 elif chosen_status != 0:711 query = f"SELECT o.customer_name, o.order_price, o.order_id, o.order_status, o.customer_address, c.courier_db_id FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id AND o.order_status = {chosen_status} ORDER BY c.courier_db_id"712 #fm.format_display(disp_size)713 result = get_from_db(query)714 def print_full_key_with_caps_for_current():715 status_dict = {1:"preparing", 2:"delivering", 3:"delivered", 4:"recieved", 5:"cancelled", 6:"scheduling", 9:"all"}716 print(f"Order Status Key\n{fm.print_dashes(return_it=True)}")717 if_current_make_bold = lambda x,y : f" {x.upper()} <<" if y == chosen_status or y == 0 else x718 for num, status in status_dict.items():719 if num == chosen_status:720 print(f"{Fore.GREEN}[ {num} ] {if_current_make_bold(status.title(), num)}")721 else:722 print(f"{Fore.LIGHTBLACK_EX}[ {num} ] {if_current_make_bold(status.title(), num)}")723 def status_num_to_word(statnum:int):724 status_dict = {1:"preparing", 2:"out for delivery", 3:"delivered", 4:"recieved", 5:"cancelled", 6:"scheduling", 0:"all", 9:"all"}725 rv = status_dict.get(statnum)726 return(rv.title())727 728 # would be nice to fix for very first display but meh is ok dw729 print_full_key_with_caps_for_current()730 #print(f"Order Status Key\n{fm.print_dashes(return_it=True)}\n[ 1 ] Preparing\n[ 2 ] Out For Delivery\n[ 3 ] Delivered\n[ 4 ] Recieved\n[ 5 ] Cancelled\n[ 6 ] Scheduling\n[ 9 ] All")731 print("")732 print(f"Order Status : {status_num_to_word(chosen_status)} Orders [ {chosen_status} ]") 733 fm.print_dashes()734 print("\nOrder Number & Customer Customer Address Order Cost Current Status")735 print(f"{fm.print_dashes(amount_of_dashes=10, return_it=True)} {fm.print_dashes(amount_of_dashes=8, return_it=True)} {fm.print_dashes(amount_of_dashes=4, return_it=True)} {fm.print_dashes(amount_of_dashes=4, return_it=True)}")736 for order_info in result:737 display1 = f"#{order_info[2]} - {order_info[0]}"738 display2 = f"{make_address_readable(order_info[4])}"739 display3 = f"{order_info[1]}"740 spaces1 = create_spaces(display1, 31)741 spaces2 = create_spaces(display2, 20)742 spaces3 = create_spaces(display3, 12)743 print(f"{order_info[2]} - {order_info[0]} {spaces1} -> {make_address_readable(order_info[4])} {spaces2} £{order_info[1]} {spaces3} {order_info[3]}")744 if len(result) <= max_length_of_all_couriers_orders - 1:745 the_diff_to_add_lines = (max_length_of_all_couriers_orders - len(result))746 if len(result) == 0:747 the_diff_to_add_lines -= 1748 for _ in range(the_diff_to_add_lines):749 print("-")750 #x = ["\n" for _ in range(the_diff_to_add_lines)] probs needs minus 1 too if you wanna do it 751 #print(*x)752 753 if len(result) == 0:754 print(return_one_line_art())755 fm.print_dashes()756 print("")757 #print("If You Would Like To See Orders For A Specific Order Status Please Enter The Order Code")758 chosen_status = int(input("Enter A Status Number To Filter By It, OR 9 To View All, OR 0 To Exit : ")) # will type error if not an int so do convert outside and then can work it with below if statement759 if chosen_status != 1 and chosen_status != 2 and chosen_status != 3 and chosen_status != 4 and chosen_status != 5 and chosen_status != 6 and chosen_status != 9 and chosen_status != 0: #make it 9?760 chosen_status = 9761 # confirm its in range here?762 if (chosen_status != 0):763 print("And Go Again") # lol remove this knobhead764 else:765 print("Well End Function Then")766 go_again = False767 break768 769#####################################################################################################################################770def db_join_by_courier_only_live_orders(): # here right join will give you all the couriers even if they dont have an order which is kinda kewl771 query = f"SELECT o.customer_name, o.order_price, o.order_id, o.order_status, c.name, c.courier_db_id FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id WHERE o.order_status = 1 OR o.order_status = 2 ORDER BY c.courier_db_id"772 #ORDER BY...773 result = get_from_db(query)774 for a_result in result:775 print(a_result)776 fm.fake_input()777def get_live_orders_for_courier(chosen_courier:int):778 chosen_courier += 1779 query = f"SELECT o.order_status FROM orders o INNER JOIN couriers c ON o.courier_id = c.courier_db_id WHERE c.courier_db_id = {chosen_courier} AND (o.order_status = 1 OR o.order_status = 2) ORDER BY c.courier_db_id"780 result = get_from_db(query)781 return(len(result))782 # OBVS JUST QUICKLY DO ONLY LIVE ORDERS FOR ALL COURIERS WILL BE EASY AF783 # SOME FORMAT ISSUES AND WANT TO DISPLAY THE STATUS PROPERLY (tbf idk how it displays rn lol)784 # THEN STRAIGHT AFTER DO ONLY LIVE ORDERS, THEN CONT - or option to input the code and it shows you for the code only ooo785## CREATE NEW ORDER FUNCTIONS ##############################################################################################################################################786def create_new_order(disp_size, rows): 787 """Create new order by calling appropriate functions (for vaildation) then creating new instance with the validated inputs"""788 lets_continue = True789 fm.format_display(disp_size)790 while lets_continue:791 # GET NAME792 name = get_name(disp_size, True)793 if name == "0":794 print("Escape Key Logged")795 lets_continue = False796 797 # GET NUMBER798 if lets_continue:799 phone_number = get_mobile(disp_size, name)800 if phone_number == "0":801 print("Escape Key Logged")802 lets_continue = False803 # GET ADDRESS804 if lets_continue:805 customer_address = get_address(disp_size, name, phone_number)806 if customer_address == "0":807 print("Escape Key Logged")808 lets_continue = False809 # GET PRODUCTS FOR ORDER, UPDATE QUANTITIES, & GET FINAL PRICE810 if lets_continue:811 order_prdcts, order_cost = get_and_add_products(disp_size, rows)812 if order_cost == "0":813 print("Escape Key Logged")814 lets_continue = False815 # GET COURIER816 if lets_continue:817 attached_courier = get_couriers_from_list_and_attach(disp_size, rows, name, phone_number, customer_address)818 if attached_courier == "0":819 print("Escape Key Logged")820 lets_continue = False821 # GET ORDER STATUS822 if lets_continue:823 order_status = add_order_status(None)824 if order_status == "0":825 print("Escape Key Logged")826 lets_continue = False827 else:828 break829 830 else:831 fm.print_dashes()832 print("Order Cancelled")833 fm.fake_input()834 return(False) # escape key logged, order cancelled, dont allow quick create by returning false (false - order not made)835 fm.print_dashes()836 # MAKE THE ORDER837 Orders(name, customer_address, phone_number, order_status, order_cost, None, attached_courier, order_prdcts)838 ord_uwuid = Orders.orders_list[-1].order_id839 # some kinda confirm before adding it to the db - like try except for the class init method or sumnt maybe idk?840 add_new_order_to_db(name, customer_address, phone_number, order_status, order_cost, ord_uwuid, attached_courier, order_prdcts)841 fm.format_display(disp_size)842 843 print(f"{Fore.GREEN}This Was A Triumph! - Order Made")844 fm.print_dashes()845 print(f"ORDER CONFIRMED")846 fm.print_dashes()847 print(f"Order ID (for your records) : {ord_uwuid}") #else use get attr848 fm.print_dashes()849 print(f"You've Successfully Been Charged {Fore.GREEN}£{order_cost}")850 fm.print_dashes()851 fm.fake_input()852 return(True) # if made new (true - order made succesfully)853def add_new_order_to_db(customer_name:str, customer_address:str, customer_phone:str, order_status:str, order_price:float, order_id:int, courier_id:int, products_ids:str):854 print(products_ids)855 query = f'INSERT INTO orders (customer_name, customer_address, customer_phone, order_status, order_price, order_uuid, courier_id, products_ids) VALUES ("{customer_name}","{customer_address}","{customer_phone}","{order_status}",{order_price},{order_id},{courier_id},"{products_ids}")'856 add_to_db(query)857def get_name(disp_size, first_run = False): # v2 validation = needs try except to be acceptable858 """Get and return name of courier with simple regex validation, not used for update but should refactor for this?"""859 invalid_name = True860 # name_is_good = lambda x : re.match(r"[A-Za-z]{2,25}|\s|\.|[a-zA-Z]|[A-Za-z]{1,25}\w$", x) # actually wrote this myself so do think it works but also i mean fuck knows \D[a-zA-Z]+($\s{1}|.)861 while invalid_name:862 fm.format_display(disp_size)863 if first_run: print(f"Create New Order\n{fm.print_dashes(return_it=True)}\n")864 name = input("Enter Name (need to redo regex validation) : ")865 if name == "0":866 break867 elif len(name) >= 3: # name_is_good(name):868 print(f"Name [{name}] Validated")869 break870 else:871 print("Try Again")872 return(name)873def get_mobile(disp_size, name:str = None): # v2 validation = needs try except to be acceptable874 invalid_mob = True875 mob_is_good = lambda x : re.match(r"^(07\d{8,12}|447\d{7,11})$", x) # lambda lets us keep expression outside of loop, allows 07 start or 447 start but not +876 while invalid_mob:877 fm.format_display(disp_size, True)878 if name: # not equal to none?879 print(f"Name : {name}\n{fm.print_dashes(return_it=True)}\n") # else print dashes?880 num = input("Enter Valid UK Mobile Number (no + symbol) : ")881 if num == "0":882 break883 elif len(num) > 1: #mob_is_good(num): (put it back, is just too long when testing frequently)884 print("Number Validated")885 break886 else:887 print("Try Again")888 return(num)889def get_address(disp_size, name=None, phone_number=None): # v2 validation = needs try except to be acceptable890 """Get and return name of courier with simple regex validation, not used for update but should refactor for this?"""891 # COULD LEGIT DO IF ADDRESS CONTAINS VALID LOCATION ONLY SHOW VALID COURIERS, ELSE SHOW ALL892 invalid_addy = True893 # name_is_good = lambda x : re.match(r"[A-Za-z]{2,25}|\s|\.|[a-zA-Z]|[A-Za-z]{1,25}\w$", x) 894 while invalid_addy:895 fm.format_display(disp_size)896 if name is not None and phone_number is not None: # when comparing to None always use comparison not equality 897 print(f"{fm.print_dashes(return_it=True)}\nName : {name}\nMobile : {phone_number}\n{fm.print_dashes(return_it=True)}\n")898 address = input("Enter Address : ")899 if address == "0":900 break901 elif len(address) >= 5: # name_is_good(name):902 print(f"[{address}] Validated")903 break904 else:905 print("Try Again")906 return(address)907def get_couriers_from_list_and_attach(disp_size, rows, name=None, phone_number=None, customer_address=None): # if take locations can create new function to update them (add/remove/rename), v3 validation = acceptable908 # print the couriers list, to improve obvs909 fm.format_display(disp_size)910 911 # main functionality912 not_cancelled = True913 while not_cancelled:914 # if name and phone_number and customer_address: (should work the same, test it)915 if name is not None and phone_number is not None and customer_address is not None: # when comparing to None always use comparison not equality 916 print(f"{fm.print_dashes(return_it=True)}\nName : {name}\nMobile : {phone_number}\nAddress : {customer_address}\n{fm.print_dashes(return_it=True)}\n") #might remove here 917 Orders.items_per_list_print_couriers(disp_size, rows)918 print("")919 try:920 user_input = int(input("Enter Your Selection : "))921 except ValueError:922 wrong_val = True923 if user_input > len(cour.Couriers.couriers_list) or user_input < 0: # error cases924 if wrong_val:925 print("Try Again - Wrong Value\n")926 else:927 print("Try Again - Wrong Selection\n")928 elif user_input == 0: # escape key, its already been int converted btw929 print("Uhhhh... I HATE zer0s") #... but you are zeros tho930 break931 else:932 fm.format_display(disp_size)933 fm.print_dashes()934 cr = cour.Couriers.couriers_list[user_input - 1] 935 print(f"You Selected Courier - {cr.name}, (ID : {cr.courier_id})")936 fm.print_dashes()937 print("Assigning This Courier To The Order Now") 938 break939 # END WHILE940 if user_input == 0:941 return ("0")942 else:943 return(user_input) #is an int944 945def get_and_add_products(disp_size, rows, basket_total:float = 0.0, order_basket:list = []):946 got_more = True947 while got_more:948 prdct.Product.paginated_print(prdct.Product, disp_size, rows, "Enter Page (step with .), To Select A Product First Hit '0' : ","Choose A Product To Add To The Order")949 product_to_add = int(input("Enter Product Number To Add It To The Order : "))950 prod = prdct.Product.products_list[product_to_add - 1]951 prod_price = getattr(prod, "price_gbp")952 prod_name = getattr(prod, "name")953 prod_numb = getattr(prod, "product_number")954 prod_quant = getattr(prod, "quantity")955 if int(prod_quant) == 0:956 print("Try Again - Reloop!") # TO DO THIS, is easy af ffs just cba957 else:958 fm.format_display(disp_size)959 print(f"Sure You Want To Add [{prod_numb}] - {prod_name} To The Order?\nPrice = £{prod_price}")960 print("Commit Confirm - assuming yes")961 fm.print_dashes()962 print("Ok how many?")963 quant_is_good = True # DUH SHIT LIKE THIS AS FUNCTION!!!964 while quant_is_good:965 how_many = int(input(f"Enter Amount ({prod_quant} available) : ")) 966 if how_many > prod_quant:967 print("Something Doesn't Quite Add Up... Try Again")968 fm.fake_input()969 break 970 order_basket = product_basket(prod_numb, prod_name, prod_price, how_many, order_basket)971 total_basket_quant = 0972 for order in order_basket:973 total_basket_quant += order[3]974 print(f"Order Basket = {order_basket}")975 basket_total += order_basket[-1][2] #last item added, the final price (multiplied by wanted quantity)976 fm.format_display(disp_size)977 print(f"Current Basket\n{fm.print_dashes(return_it=True)}")978 print(f"Price : £{basket_total:.2f}")979 print(f"Items : {total_basket_quant} total") # should really use the sum/count of how_many instead of len basket div 5 but meh980 fm.print_dashes()981 for product in order_basket:982 print(f"{product[3]}x {product[1]}(#{product[0]}) @ £{(product[2] / product[3]):.2f} each - [£{product[2]} total]")983 # commit confirm984 fm.print_dashes()985 yesno = fm.get_user_yes_true_or_no_false(before_text=f"Want To Add More Items\n{fm.print_dashes(return_it=True)}\n", yes="Order More",no="I'm Done - Start Checkout")986 fm.fake_input() # DELETE THIS RIGHT?!?!?!!?987 if yesno == False:988 got_more = False989 break990 break991 # NESTED FUNCTION 992 def display_updated_basket(order_basket, original_order_basket):993 print(f"Looks Like Some Items Sold Out!\nWe've Updated Your Basket\nPlease Reconfirm Your Order\n{fm.print_dashes()}\nUpdated Basket\n{fm.print_dashes()}") # THIS WILL BE THE NEW UPDATED FINAL DISPLAY994 for product in order_basket:995 print(f"{product[3]}x {product[1]}(#{product[0]}) @ £{(product[2] / product[3]):.2f} each - [£{product[2]} total]")996 # END NESTED FUNCTION 997 print("Ok finalising your basket...")998 final_products_list = []999 original_order_basket = order_basket1000 order_basket = update_quants_from_basket(order_basket)1001 final_products_quants_list = []1002 if order_basket is None:1003 #print("order basket is none")1004 order_basket = original_order_basket1005 #print(f"replacing basket = {original_order_basket}")1006 #print(f"order basket is now = {order_basket}")1007 else:1008 display_updated_basket(order_basket, original_order_basket) #display updated basket and confirm function1009 for item_info in order_basket:1010 final_products_list.append(item_info[0]) # FOR STRICT GENERATION PROJECT1011 final_products_quants_list.append((item_info[0],item_info[3])) # NEW! -> TO RETURN 1012 print(f"Final Basket Total\n{fm.print_dashes(return_it=True)}")1013 new_basket_total = 01014 for product in order_basket:1015 new_basket_total += product[2]1016 basket_total = new_basket_total1017 #print(f"The Item Numbers Being Sent Are = {final_products_quants_list}")1018 if basket_total >= 12.99: 1019 print(f"£{basket_total:.2f}")1020 fm.print_dashes()1021 print("COMMIT CONFIRM?")1022 fm.fake_input()1023 return(final_products_quants_list, basket_total) 1024 else:1025 print("Your Basket Is Under £12.99")1026 fm.print_dashes()1027 print(f"£{basket_total:.2f}")1028 fm.print_dashes()1029 print("So We're Adding A £2.99 Delivery Charge")1030 fm.print_dashes()1031 print(f"[ 1 ] Accept\n[ 2 ] Cancel\n[ 3 ] Order More\n{fm.print_dashes(return_it=True)}") #(loop back to readd would be nice but urgh - maybe back out then back in but wouldnt save ur orders but meh)1032 y_n_or_more = input("Enter Your Selection : ")1033 1034 if y_n_or_more == "1": # ACCEPT1035 fm.format_display(disp_size)1036 fm.print_dashes()1037 print("Delivery Charge Added")1038 print("Basket Updated")1039 fm.print_dashes()1040 basket_total += 2.991041 print("")1042 fm.print_dashes()1043 print(f"{Fore.GREEN}£{basket_total:.2f}")1044 fm.print_dashes()1045 print("")1046 print("\(‾▿‾\) (/‾▿‾)/")1047 print("")1048 #print("Your Order Has Been Confirmed")1049 print(final_products_quants_list)1050 fm.fake_input()1051 return(final_products_quants_list, basket_total) # THE ACTUAL FUCK THO, MAYBE JUST RETURN ONE TUPLE OR LIST AND UNPACK THE VALUES THEN BUT IT WAS WORKING BEFORE SO WTF MAN!!!! AND IT FUCKING WORKS WITH ONE WHAT THE FUCKKKKKKKK1052 elif y_n_or_more == "2": # CANCEL1053 fm.print_dashes()1054 print("Sorry It Had To End Like This")1055 print("")1056 print("(・_・;)")1057 print("")1058 fm.print_dashes()1059 fm.fake_input()1060 return("0", "0") 1061 else: # ORDER MORE - so loop back1062 get_and_add_products(disp_size, rows, basket_total) 1063 # aite so believe this will work just sending them back here to the same function1064 # but that isn't going to save your basket or the cost of your basket1065 # but since we initialise those vars at the start defo is a work around1066 return(final_products_quants_list, basket_total)1067 1068 1069def product_basket(item_num_to_add:int, item_name_to_add:str, item_price_to_add:float, how_many:int, basket_list=None):1070 if basket_list is None:1071 basket_list = []1072 final_price_to_add = item_price_to_add * how_many # *= duh1073 basket_list.append([item_num_to_add, item_name_to_add, final_price_to_add, how_many])1074 print(f"Current Basket = {basket_list}")1075 return(basket_list)1076def update_quants_from_basket(order_basket):1077 made_updates = False1078 for i, item in enumerate(order_basket):1079 how_many = int(order_basket[i][3]) # might be unnecessary to force conversion here1080 final_quant = (prdct.Product.products_list[item[0]-1].quantity)-(how_many)1081 if final_quant < 0:1082 #print(f"Uh Oh, Looks Like We Need To {0 - final_quant} From {prdct.Product.products_list[item[0]-1].name}") 1083 #print(f"Updating User Basket - {order_basket}")1084 #print(f"Updating Item Total - {order_basket[i][3]} + {final_quant}")1085 order_basket[i][3] + final_quant1086 made_updates = True 1087 order_basket.pop(i) 1088 # if ok then commit the change and return the order (ONLY RETURN IF VALID THEN COULD CHECK IF IS NONE ON RETURN - ACTUALLY YES AS WANT TO CONFIRM ANY UPDATES WITH THE USER)1089 if made_updates == False:1090 prdct.Product.products_list[item[0]-1].quantity -= how_many1091 #print(f"Updated Quantity = {prdct.Product.products_list[item[0]-1].quantity}")1092 if made_updates:1093 #print("Made Updates")1094 return(order_basket) # new and necessary - must return as may update it now 1095 else:1096 return(None)1097 # test1098 # then print orders to make sure is working as expected, print via class btw!1099 # then 1100 # natty lang and rich lib1101 # and remaining functions/functionality1102 # then new server stuff1103 # also new web scrape betting project idea1104def add_order_status(the_code = None):1105 '''adds order code as status to an order'''1106 print_string = ["[ 1 ] = Preparing", "[ 2 ] = Out For Delivery", "[ 3 ] = Delivered", "[ 4 ] = Recieved", "[ 5 ] = Cancelled", "[ 6 ] = Scheduling"]1107 if the_code == None: # for update not add1108 print("Choose Status To Set To Order") # want order number? could be done easily enough1109 fm.print_dashes()1110 print(*print_string, sep="\n") # print("[ 7 ] = Custom Code") #not doing (rn anyways) but could 1111 fm.print_dashes()1112 user_code = int(input("Choose A Code For The Order : "))1113 else:1114 code_to_int = int(the_code)1115 user_code = code_to_int1116 if user_code == 1:1117 return("Preparing") 1118 elif user_code == 2:1119 return("Out For Delivery")1120 elif user_code == 3:1121 return("Delivered")1122 elif user_code == 4:1123 return("Recieved")1124 elif user_code == 5:1125 return("Cancelled")1126 elif user_code == 6:1127 return("Scheduling")1128 elif user_code == 0:1129 return("0") # escape key (tho when adding to db with no enum the status will be set to 0 which means error (which totally works tbf))1130 else:1131 return("Error") # guna return error for debugging but ig should return preparing as default1132## MAIN MENU ############################################################################################################################################################1133def main_orders(rows=3, disp_size=22):1134 #disp_size = 201135 #rows = 31136 add_spaces = lambda x : x # take each line and add spaces appropriately should be easy might need full def tho1137 menu_string = [f"ORDERS v3.01\n(using object oriented principles)\n{fm.print_dashes(return_it=True)}\n","[ 1 ] Create New", "[ 2 ] Print SubMenu", "[ 3 ] Format Display", "[ - ] -", "[ - ] -", "[ - ] -", "[ - ] -", "[ - ] -", "[ - ] -", "[ S ] -", "[ L ] -", "[ 0 ] Main Menu\n","- - - - - - - - - - -"]1138 user_menu_input = 11139 print_again = True1140 while user_menu_input != "0":1141 if print_again: # for quick menu, returns the user to their place in this switch statement without printing the menu again1142 # print menu, get user input1143 fm.format_display(disp_size)1144 print(*menu_string, sep="\n")1145 user_menu_input = input("Enter Menu Selection : ")1146 # [1] CREATE NEW ORDER1147 if user_menu_input == "1":1148 print("Create New Order")1149 if create_new_order(disp_size, rows):1150 # QUICK MENU / CREATE AGAIN1151 print("Quick Create New Order?\n") #from a ux perspective would be un-needed here, but for testing and general use its helpful so including1152 if fm.get_user_yes_true_or_no_false():1153 print_again = False1154 user_menu_input = "1"1155 else:1156 print_again = True1157 1158 # [2] PRINT SUBMENU1159 if user_menu_input == "2":1160 print("Print SubMenu")1161 print_sub_menu(disp_size, rows)1162 # [3] FORMAT SCREEEN1163 elif user_menu_input == "3":1164 disp_size = format_screen(disp_size)1165 fm.fake_input()1166 1167 return(rows, disp_size)1168def print_sub_menu(disp_size, rows):1169 menu_string = [f"ORDERS v3.01\n(using object oriented principles)\n{fm.print_dashes(return_it=True)}\n","[ 1 ] Print Orders List [DB, Paginated, Only Valid]", "[ 2 ] Print Orders By Courier [DB, Paginated, Inc None]", "[ 3 ] Search Orders By Courier [DB, Paginated, Only Valid]", "[ 4 ] Search Orders By Status [DB, Only Valid]", "[ 0 ] Back To Orders Menu\n","- - - - - - - - - - -"]1170 user_menu_input = 11171 while user_menu_input != "0":1172 fm.format_display(disp_size)1173 print(*menu_string, sep="\n")1174 user_menu_input = input("Enter Menu Selection : ")1175 1176 if user_menu_input == "1": # changed from if to elif, shouldnt cause problems, but commenting just incase1177 db_print_orders(disp_size, rows)1178 fm.fake_input()1179 elif user_menu_input == "2":1180 db_print_orders_for_every_courier(disp_size, rows)1181 elif user_menu_input == "3":1182 db_join_by_chosen_courier_then_status(disp_size, rows) #db_print_courier_for_search(disp_size, rows)1183 elif user_menu_input == "4":1184 db_print_search_by_status(disp_size, rows) #db_join_by_chosen_courier(disp_size, rows)1185def format_screen(disp_size:int): 1186 user_submenu_input = "1"1187 while user_submenu_input != "0":1188 hl_curr_disp = lambda x : f"{x}" if x != disp_size else f"{Fore.GREEN}{x} << CURRENT DISPLAY SIZE"1189 print(*[hl_curr_disp(x+9) for x in reversed(range(45))], sep="\n")1190 print(f"Current Display Size = {disp_size} < ENSURE THESE MATCH! -> Tip! use 0 to reset the display)") 1191 print("Recommended Display Size = 29+")1192 print("Recommended Minimum Display Size = 16") ## 16 gets 30 items comfortably (per line) without overlap or need for pagination so start here (15 had mad overlap)1193 fm.print_dashes()1194 print("Adjust The Display Then Enter [ 0 ] To Reset The Display Counter To See The Numbers")1195 print("The Very Top Number Will Be Your Display Size")1196 fm.print_dashes()1197 new_disp_size = int(input("Enter The Number For The Screen Size You Want : "))1198 if new_disp_size > 0: # should make this be greater than the acceptable size but whatever1199 print(f"New Screen Size = {new_disp_size}")1200 return(new_disp_size)1201 else:1202 user_submenu_input = "1"1203def set_display_rows(rows: int):1204 # to improve that just make the amount of spaces dynamic and trim the end of long strings to fit uniformly1205 # N0TE! - should trim the end of long strings anyway btw!1206 fm.format_display()1207 print("Choose How Many Columns To Display In Menus (1 - 3)")1208 print(f"Max Columns = 3, Recommended Columns = 3, Current = {rows})") # used to be 5 until price etc so updating for those changes tho no really tested, hardly worth it1209 rows = int(input("Enter A Number Between 1 and 3 : "))1210 return(rows)1211# can also use this for excepts/errors tbf lol1212def return_one_line_art():1213 one_line_ascii_art_list = ["̿' ̿'\̵͇̿̿\з=(◕_◕)=ε/̵͇̿̿/'̿'̿ ̿ NOBODY MOVE!","( ͡° ͜ʖ ͡°) *STARING INTENSIFIES*","(╯°□°)--︻╦╤─ - - - WATCH OUT HE'S GOT A GUN","(⌐■_■)--︻╦╤─ - - - GET DOWN MR PRESIDENT","┻━┻︵ \(°□°)/ ︵ ┻━┻ FLIPPIN DEM TABLES","(ノಠ益ಠ)ノ彡︵ ┻━┻︵ ┻━┻ NO TABLE IS SAFE","ʕつಠᴥಠʔつ ︵ ┻━┻ HIDE YO KIDS HIDE YO TABLES","(ಠ_ಠ)┌∩┐ BYE BISH","(ง •̀_•́)ง FIGHT ME FOKER!","[¬º-°]¬ [¬º-°]¬ ZOMBIES RUN!","(╭ರ_•́) CURIOUSER AND CURIOUSER","つ ◕_◕ ༽つ つ ◕_◕ ༽つ TAKE MY ENERGY","༼つಠ益ಠ༽つ ─=≡ΣO)) HADOUKEN!"]1214 return(one_line_ascii_art_list[random.randint(0, len(one_line_ascii_art_list)-1)])1215 1216if __name__ == "__main__":1217 main_orders()1218#### SPEC1219#1220#1221# 1 - PRINT ORDERS...

Full Screen

Full Screen

betClass.py

Source:betClass.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3Subclass of actinfClass4Sets up the necessary matrices to simulate the Kolling_2014 experiments. There5are two variants: setMDP and setMDPMultVar. The difference between them is in6whether the action pairs (probabilities and rewards of the risky and safe7options) are fixed or are allowed to change.8Note that as a subclass of actinfClass, the exampleFull() method is available9and works with this particular paradigm with no further modification.10Uses:11 The initiation of the class accepts four optional inputs:12 paradigm {'kolling','kolling compact','small'} controlls which13 type of paradigm is used. For 'kolling', the exact14 numbers are used as in kolling_2014. For 'kolling15 compact' similar numbers are used, but total points16 and possible rewards are all divided by 10. This17 GREATLY reduces number of states and improves18 performance, while maintaining the same scheme. For19 'small', only 12 states are used; this is mostly for20 testing, as it works very quickly.21 changePairs {bool} Controls whether action pairs change from trial22 to trial (and thus, are many of them) or whether there23 is just one action pair and is fixed throughout the24 experiment. 'True' for the former.25 actionPairs dict{risklow, riskhigh, rewlow,rewhigh} dictionary with26 the action pair to be used; use only with27 changePairs = False. If not provided, defaults of28 {0.9,0.3,1,3} will be used.29"""30from __future__ import print_function # Probably unnecessary.31import numpy as np32import itertools33import utils34import actinfClass as afc35class betMDP(afc.Actinf):36 def __init__(self, paradigm='kolling compact', changePairs=True,37 actionPairs=None, nS=None, nT=None, thres=None,38 obs_sd=None):39 """40 Initializes the instance with parameter values depending on the inputs.41 See the class's help for details on optional inputs.42 Additionally, calls setMDPMultVar() or setMDP, depending on inputs, to43 set all the MDP matrices for Active Inference.44 """45 if nS is not None:46 nS = int(nS)47 if nT is not None:48 nT = int(nT)49 if thres is not None:50 thres = int(thres)51 if paradigm == 'kolling':52 self.paradigm = 'kolling'53 self.pdivide = 154 if nS is not None:55 self.nS = nS56 else:57 self.nS = 2000 # max number of attainable points58 if thres is not None:59 self.thres = thres60 else:61 self.thres = 100062 elif paradigm == 'kolling compact':63 self.paradigm = 'kolling'64 self.pdivide = 1065 if nS is not None:66 self.nS = nS67 else:68 self.nS = 200 # max number of attainable points69 if thres is not None:70 self.thres = thres71 else:72 self.thres = 6073 else:74 if nS is not None:75 self.nS = nS76 else:77 self.nS = 20 # max number of attainable points78 if thres is not None:79 self.thres = thres80 else:81 self.thres = 582 self.paradigm = 'small'83 if nT is not None:84 self.nT = nT85 else:86 self.nT = 887 self.nU = 288 self.obsnoise = 0 # 0.001 #0.000189 if changePairs is True:90 self.setMDPMultVar(parameters=actionPairs, obs_sd=obs_sd)91 else:92 self.setMDP(parameters=actionPairs, obs_sd=obs_sd)93 def setMDP(self, parameters=None, obs_sd=None):94 """ Sets the observation and transition matrices, as well as the goals,95 the priors over initial states, the real initial state and the possible96 policies to evaluate.97 A single action pair is used for all trials. The probabilities and98 reward magnitudes of this pair is set by the actionPairs input to99 __init__. It can also be taken from the input 'parameters', when100 calling the method manually.101 If the action pair is to be specified, the input 'parameter' must be102 a dict with entries risklow, riskhigh, rewlow and rewhigh.103 """104 from scipy.stats import norm105 # checking inputs and assigning defaults if necessary106 if parameters != None:107 if not isinstance(parameters, dict):108 raise Exception('BadInput: ''parameters'' is not a dict')109 expectedParameters = ['risklow', 'riskhigh', 'rewlow', 'rewhigh']110 compareSets = set(parameters) & set(expectedParameters)111 if compareSets != set(expectedParameters):112 raise Exception('BadInput: ''Paremeters'' does not contain the \113 required parameters')114 else:115 risklow = parameters['risklow']116 riskhigh = parameters['riskhigh']117 rewlow = parameters['rewlow']118 rewhigh = parameters['rewhigh']119 else:120 risklow = 0.9121 riskhigh = 0.3122 rewlow = 1123 rewhigh = 3124 nS = self.nS125 nP = self.nP126 # Define the observation matrix127 if obs_sd is not None:128 A = np.zeros((nS * nP, nS * nP))129 for n in range(nS * nP):130 A[:, n] = norm.pdf(np.arange(nS * nP), n, obs_sd)131 else:132 A = np.eye(self.nS) + self.obsnoise133 A = A / sum(A, axis=0)134 # Define transition matrices135 B = np.zeros((self.nU, self.nS, self.nS))136 B[0] = np.diag(risklow * np.ones(self.nS - rewlow), -rewlow)137 np.fill_diagonal(B[0], 1 - risklow)138 B[0][-1, (-1 - rewlow):-1] = risklow139 B[0][-1, -1] = 1140 B[1] = np.diag(riskhigh * np.ones(self.nS - rewhigh), -rewhigh)141 np.fill_diagonal(B[1], 1 - riskhigh)142 B[1][-1, (-1 - rewhigh):-1] = riskhigh143 B[1][-1, -1] = 1144 # Define priors over last state (goal)145 C = np.zeros(self.nS)146 C[self.thres:] = 1147 C = C / sum(C)148 # Priors over initial state149 D = np.zeros(self.nS)150 D[0] = 1151 # Real initial state152 S = D.astype(int)153 # Policies: all combinations of 2 actions154 V = np.array(list(itertools.product(155 range(0, self.nU), repeat=self.nT)))156 # Preparing inputs for MDP157 self.A = A158 self.B = B159 self.C = C160 self.D = D161 self.S = S162 self.V = V.astype(int)163 self.alpha = 64164 self.beta = 4165 self.lambd = 0.005 # or 1/4?166 self.gamma = 20167 self.importMDP()168# self.H = np.zeros(self.H.shape)169 def setMDPMultVar(self, parameters=None, obs_sd=None):170 """Sets the observation and transition matrices, as well as the goals,171 the priors over initial states, the real initial state and the possible172 policies to evaluate.173 A set of action pairs is used here, and are to be selected randomly174 for each trial during simulations. To do this, a third action is175 created, B[2], which is called the evolution action. When chosen, the176 action pair changes to a random one (the distribution is not177 necessarily constant).178 If the action pairs are not supplied, they are taken from the method179 setActionPairs(). Otherwise, they must be provided in a dict with180 keys pL, pH, rL, rH, which must be numpy arrays of the same length.181 """182 from scipy.stats import norm183 # Working parameters. To be imported later184 nU = self.nU185 nS = self.nS186 nT = self.nT187# thres = self.thres188 obsnoise = self.obsnoise189 if parameters is not None:190 if not isinstance(parameters, dict):191 raise ValueError('Input ''parameters'' is not a dict')192 expectedParameters = ['pL', 'pH', 'rL', 'rH']193 compareSets = set(parameters) & set(expectedParameters)194 if compareSets != set(expectedParameters):195 raise ValueError(' ''paremeters'' does not contain the \196 required parameters')197 else:198 self.pL = parameters['pL']199 self.pH = parameters['pH']200 self.rL = parameters['rL']201 self.rH = parameters['rH']202 self.nP = nP = len(self.pL)203 else:204 # Default action pairs205 self.setActionPairs()206 nP = self.nP207 # Define observation matrix208 if obs_sd is not None:209 A = np.zeros((nS * nP, nS * nP))210 for n in range(nS * nP):211 A[:, n] = norm.pdf(np.arange(nS * nP), n, obs_sd)212 else:213 A = np.eye(nS * nP) + obsnoise / nP214 # Transition matrices215 # num2coords = np.unravel_index216 # coords2num = np.ravel_multi_index217 self.set_transition_matrices()218 # Define priors over last state (goal)219 self.set_prior_goals()220 # Priors over initial state221 D = np.zeros(nS * nP)222 D[0] = 1223 # Real initial state224 S = D.astype(int)225 # Policies: all combinations of 2 actions226 V = np.array(list(itertools.product(range(0, nU), repeat=nT)))227 np.random.shuffle(V)228 # Preparing inputs for MDP229 self.A = A230# self.B = B231# self.C = C232 self.D = D233 self.S = S234 self.V = V.astype(int)235 self.alpha = 64236 self.beta = 4237 self.lambd = 0.005 # or 1/4?238 self.gamma = 20239 self.importMDP()240# self.H = np.zeros(self.H.shape)241 def priors_over_goals(self, threshold=None, just_return=False):242 """243 xxx DEPRECATED in favor of set_prior_goals() xxx244 Not deleted just in case...245 TODO: Delete if you are brave246 """247 import utils248 import numpy as np249# print 'This function has been deprecated in favor of set_prior_goals.\n'250 if threshold is None:251 thres = self.thres252 else:253 thres = threshold254 C = np.zeros(self.nS * self.nP)255 allothers = np.array(utils.allothers([range(thres, self.nS),256 range(self.nP)],257 (self.nS, self.nP)))258 C[allothers] = 1259 C = C / sum(C)260 if just_return is False:261 self.C = C262 self.thres = thres263 elif just_return is True:264 return C265 def setActionPairs(self):266 """ Sets the action pairs to be used for this instance. Which ones are267 selected depends on the instance's parameters.268 There is probably no use of this from the outside. Might be good to269 make it internal.270 """271 if self.paradigm == 'small':272 nP = 3273 pL = np.array([0.9, 0.6, 0.8])274 pH = np.array([0.3, 0.3, 0.2])275 rL = np.array([1, 2, 1])276 rH = np.array([3, 4, 4])277 elif self.paradigm == 'kolling':278 nP = 8279 pL = np.array([90, 60, 75, 55, 90, 60, 75, 80], dtype=float) / 100280 pH = np.array([35, 35, 35, 20, 45, 45, 40, 30], dtype=float) / 100281 rL = np.array([100, 180, 145, 145, 115, 150, 170, 120],282 dtype=int) / self.pdivide283 rH = np.array([265, 260, 245, 350, 240, 190, 245, 210],284 dtype=int) / self.pdivide285 self.nP = nP286 self.pL = pL287 self.pH = pH288 self.rL = rL289 self.rH = rH290 def set_transition_matrices(self, priorActPairs=None,291 reward=None, probability=None,292 just_return=False):293 """ Defines the transition matrices (actions) for the task, using294 the input priorActPairs as priors over the probability of transitioning295 to a new action pair.296 The input priorActPairs must be a vector of size nP, normalized, whose297 elements represent the biases towards each of the action pairs.298 """299 if reward is None:300 nU, nS, nP = self.nU, self.nS, self.nP301 pL, pH, rL, rH = self.pL, self.pH, self.rL, self.rH302 else:303 nU = 2304 nS = self.nS305 nP = 1306 pL = pH = [probability]307 rL = rH = [reward]308 if priorActPairs is None:309 pAP = np.ones(nP) / nP310 else:311 pAP = priorActPairs312 B = np.zeros((nU, nS * nP, nS * nP))313 for s in range(nS):314 for p in range(nP):315 nextsL = np.min((s + rL[p], nS - 1)).astype(int)316 nextsH = np.min((s + rH[p], nS - 1)).astype(int)317 mixL = utils.allothers([[nextsL], range(nP)], (nS, nP))318 mixH = utils.allothers([[nextsH], range(nP)], (nS, nP))319 this = utils.allothers([[s], range(nP)], (nS, nP))320 ifrom = [np.ravel_multi_index([s, p], (nS, nP), order='F')]321 B[np.ix_([0], mixL, ifrom)] = (pL[p] * pAP).reshape(1, nP, 1)322 B[np.ix_([0], this, ifrom)] = (323 (1 - pL[p]) * pAP).reshape(1, nP, 1)324 B[np.ix_([1], mixH, ifrom)] = (pH[p] * pAP).reshape(1, nP, 1)325 B[np.ix_([1], this, ifrom)] = (326 (1 - pH[p]) * pAP).reshape(1, nP, 1)327 if just_return is False:328 self.B = B329 else:330 return B331 def set_single_trans_mat(self, reward, probability):332 """ Creates a single transition matrix for the current task, given the333 reward and probability given as input.334 If the inputs are vectors (same length), a number of transition335 matrices is created that equals the number of elements in the vectors.336 Wrapper for set_transition_matrices337 """338 nB = len(reward)339 nS = self.nS340 B = np.zeros((nB, nS, nS))341 for b in range(nB):342 twoBs = self.set_transition_matrices(reward=reward[b],343 probability=probability[b],344 just_return=True)345 B[b] = twoBs[0]346 return B347 def set_prior_goals(self, select_shape='flat', shape_pars=None,348 convolute=True, cutoff=True, just_return=False):349 """ Wrapper for the functions prior_goals_flat/Ramp/Unimodal.350 Sets priors over last state (goals) in different shapes for testing351 the effects on the agent's behavior.352 The three options are 'flat', which is what is normally set in cbet.py,353 'ramp', which is a ramping-up that starts at threshold, and 'unimodal',354 which uses a Gaussian to set up a 'hump' after threshold.355 Uses:356 goals = setPriorGoals(mdp [,select_shape] [, rampX1] [, Gmean]357 [, Gscale] [,convolute] [,just_return])358 Inputs:359 select_shape {'flat','ramp','unimodal','unimodal_s','sigmoid'}360 selects which shape is to be used. When selecting361 'ramp', the optional input rampX1 can be selected362 (default 1). When using 'unimodal', Gmean and363 Gscale can be set to change the mean (in Trial364 number) and scale of the Gaussian In unimodal_s, mu365 is changed to threshold+mu. Selecting a Gmean366 pre-threshold and a value for cutoff of False cause367 the 'hump' to be invisible and the priors will be368 an exponential ramp down. 'sigmoid' requires the369 two parameters for center and slope.370 shape_pars Tuple with the parameters required for the shape371 selected. They are:372 rampX1 {x} determines the initial point for the ramp,373 which uniquely determines the slope of the ramp.374 Gmean, Gscale {x}{y} determine the mean and the scale of the375 Gaussian for the unimodal and unimodal_s376 versions. In case of unimodal_s, the value of377 Gmean is shifted to threshold+mu.378 Scenter, Sslope Center and slope for using when 'sigmoid' is used.379 convolute {bool} If True, C will be calculated for the full380 state space nS*nU, where nU is the number of action381 pairs. If False, C will be in the nS space.382 cutoff {bool} If True, the priors over last state (C) will383 be set to zero pre-threshold and re-normalized.384 just_return {bool} If True, the calculated value for C will be385 returned and no further action taken. If False,386 both C and lnC will be written into self.387 Outputs:388 (Note: when just_return is False, nothing is returned)389 goals [nS] are the resulting priors over last state.390 """391 if shape_pars is not None:392 if isinstance(shape_pars[0], str):393 select_shape = shape_pars[0]394 shape_pars = shape_pars[1:]395# for item in shape_pars:396# if isinstance(item, list):397# item = item[0]398 if select_shape == 'flat':399 goals = self.prior_goals_flat(convolute, just_return=True)400 elif select_shape == 'ramp':401 rampX1 = shape_pars[0]402# raise ValueError('A value for rampX1 must be provided when using'+403# ' ''ramp''')404 goals = self.prior_goals_ramp(rampX1=rampX1,405 convolute=convolute, just_return=True)406 elif select_shape == 'unimodal':407 Gmean, Gscale = shape_pars408# raise ValueError('Values for Gmean and Gscale must be provided '+409# 'when using ''unimodal''')410 goals = self.prior_goals_unimodal(Gmean, Gscale,411 convolute=convolute,412 cutoff=cutoff, just_return=True)413 elif select_shape == 'unimodal_s':414 Gmean, Gscale = shape_pars415 Gmean += self.thres416 goals = self.prior_goals_unimodal(Gmean, Gscale,417 convolute=convolute,418 cutoff=cutoff, just_return=True)419 elif select_shape == 'sigmoid':420 Scenter, Sslope = shape_pars421# raise ValueError('Values for Scenter and Sslope must be '+422# 'provided when using ''sigmoid''')423 goals = self.prior_goals_sigmoid(Scenter, Sslope,424 convolute=convolute, just_return=True,425 cutoff=cutoff)426 elif select_shape == 'sigmoid_s':427 Scenter, Sslope = shape_pars428 Scenter += self.thres429# raise ValueError('Values for Scenter and Sslope must be '+430# 'provided when using ''sigmoid''')431 goals = self.prior_goals_sigmoid(Scenter, Sslope,432 convolute=convolute, just_return=True,433 cutoff=cutoff)434 elif select_shape == 'exponential':435 exponent = shape_pars[0]436 goals = self.prior_goals_exponential(exponent,437 convolute=convolute, just_return=True,438 cutoff=cutoff)439 else:440 raise ValueError('Unrecognized value for select_shape')441 if just_return is True:442 return goals443 elif just_return is False and convolute is True:444 self.C = goals445 self.C += (np.min(self.C) == 0) * np.exp(-16)446 self.C = self.C / self.C.sum(axis=0)447 self.lnC = np.log(self.C)448 else:449 raise ValueError('Bad combination of just_return and convolute')450 def prior_goals_sigmoid(self, Scenter, Sslope, convolute=True,451 just_return=True, slope_div=10, cutoff=True):452 """ To be called from set_prior_goals().453 NOTE: slope_div rescales the S parameter in the sigmoid (slope). This454 is done to avoid numerical problems when saving posteriors over455 actions in invert_parameters. It can be set to 1 to remove the effect.456 """457 def sigmoid(C, S, X): return 1 / (1 + np.exp(-S / slope_div * (X - C)))458 points = np.arange(self.nS)459 goals = sigmoid(Scenter, Sslope, points)460 if convolute:461 goals = np.tile(goals, self.nP)462 goals /= goals.sum()463 if cutoff:464 goals[:self.thres] = 0465 goals /= goals.sum()466 if just_return:467 return goals468 else:469 self.C = goals470 self.lnC = np.log(goals)471 def prior_goals_flat(self, convolute=True, just_return=True):472 """To be called from set_prior_goals()."""473 from utils import allothers474 if convolute is True:475 goals = np.zeros(self.nS * self.nP, dtype=float)476 indices = np.array(allothers([range(self.thres, self.nS),477 range(self.nP)], (self.nS, self.nP)),478 dtype=int)479 goals[indices] = 1.0 / indices.size480 elif convolute is False:481 goals = np.zeros(self.nS, dtype=float)482 goals[self.thres:] = 1.0 / goals[self.thres:].size483 if just_return is True:484 return goals485 if just_return is False and convolute is True:486 self.C = goals487 self.lnC = np.log(goals)488 else:489 raise ValueError('Bad combination of just_return and convolute')490 def prior_goals_ramp(self, rampX1, convolute=True, just_return=True):491 """ Creates goals as an increasing or decreasing ramp, depending on the492 value given for rampX1.493 rampX1 is the initial value. That is, the value of the first point494 after threshold. If rampX1 is smaller than M (where M is the number of495 points past-threshold), then the ramp is increasing. The slope is496 calculated automatically (since rampX1 determines it uniquely).497 To be called from set_prior_goals().498 """499 from utils import allothers500 thres = self.thres501 nS = self.nS502 pastThres = nS - thres503 nP = self.nP504 minX1 = 0505 maxX1 = 2.0 / pastThres506 if rampX1 < minX1 or rampX1 > maxX1:507 raise ValueError('Initial point X1 is outside of allowable ' +508 'limits for this task. min = %f, max = %f'509 % (minX1, maxX1))510 if rampX1 == 1.0 / pastThres:511 raise ValueError('rampX1 is invalid. For this value, use ''flat''' +512 ' instead')513 slope = (2.0 / pastThres - 2.0 * rampX1) / (pastThres - 1)514 stateRamp = rampX1 + slope * np.arange(pastThres)515 istateR = np.arange(self.thres, self.nS)516 if convolute is False:517 goals = np.zeros(nS)518 goals[thres:] = stateRamp519 else:520 goals = np.zeros(nS * nP)521 for ix, vx in enumerate(istateR):522 indices = np.array(allothers([[vx], range(self.nP)],523 (self.nS, self.nP)))524 goals[indices] = stateRamp[ix]525 goals = goals / goals.sum()526 if just_return is True:527 return goals528 elif just_return is False and convolute is True:529 self.C = goals530 self.lnC = np.log(goals)531 else:532 raise ValueError('Bad combination of just_return and convolute')533 def prior_goals_exponential(self, exponent, convolute=True, cutoff=True,534 just_return=True):535 """ Creates priors that follow an exponential law with the given power.536 To be called from set_prior_goals().537 """538 points = np.arange(self.nS)539 expoints = (0.1 * points)**(exponent / 10)540 if cutoff is True:541 expoints[:self.thres] = 0542 if convolute is True:543 expoints = np.tile(expoints, self.nP)544 expoints /= expoints.sum()545 if just_return is True:546 return expoints547 elif just_return is False and convolute is True:548 self.C = expoints549 self.lnC = np.log(expoints)550 else:551 raise ValueError('Bad combination of just_return and convolute')552 def prior_goals_unimodal(self, Gmean, Gscale,553 convolute=True, cutoff=True, just_return=True):554 """ Sets the priors over last state (goals) to a Gaussian distribution,555 defined by Gmean and Gscale. To be called from set_prior_goals().556 """557# from utils import allothers558 from scipy.stats import norm559 points = np.arange(self.nS)560 npoints = norm.pdf(points, Gmean, Gscale)561 if cutoff is True:562 npoints[:self.thres] = 0563 if convolute is False:564 goals = npoints565 else:566 # goals = np.zeros(self.Ns)567 # istateR = np.arange(self.thres, self.nS, dtype=int)568 # for ix,vx in enumerate(istateR):569 # indices = np.array(allothers([[vx],range(self.nP)],570 # (self.nS,self.nP)))571 # goals[indices] = npoints[vx]572 goals = np.tile(npoints, self.nP)573 goals = goals / goals.sum()574 if just_return is True:575 return goals576 elif just_return is False and convolute is True:577 self.C = goals578 self.lnC = np.log(goals)579 else:580 raise ValueError('Bad combination of just_return and convolute')581 def print_table_results(self, results=None):582 """ Prints a table using the results from the Example from actinfClass.583 Data for other runs can be passed as optional arguments. For this, the584 input 'results' should be a dictionary with the following entries:585 RealStates [nT] a vector containing the real state for each586 trial. It is assumed that this is in the full587 space, not just the physical space.588 Actions [nT] vector with actions at each trial.589 PostActions [nU, nT] matrix with the posteriors over actions at590 each trial.591 """592 from tabulate import tabulate593 import numpy as np594 trial_number = range(self.nT)595 pH = self.pH596 pL = self.pL597 rH = self.rH598 rL = self.rL599 expH = pH * rH600 expL = pL * rL601 if results is not None:602 Results = results603 else:604 Results = self.Example605 real_state, action_pair = np.unravel_index(Results['RealStates'],606 (self.nS, self.nP), order='F')607 actions = Results['Actions']608 post_actions = Results['PostActions']609 table_data = []610 for t in range(self.nT):611 table_data.append([trial_number[t], real_state[t], action_pair[t],612 actions[t], expL[action_pair[t]],613 expH[action_pair[t]],614 post_actions[t][0], post_actions[t][1]])615 table_headers = ['Trial', 'State', 'Act Pair', 'Action', 'expL', 'expH',616 'ProbAct0', 'ProbAct1']617 print(tabulate(table_data, headers=table_headers))618 def all_points_and_trials(self, preserve_all=False):619 """ Wrapper to do all_points_and_trials_small/kolling, depending on the620 paradigm621 """622 if self.paradigm == 'small':623 points, trials_needed = self.all_points_and_trials_small(624 preserve_all)625 return points, trials_needed626 elif self.paradigm == 'kolling':627 points, trials_needed = self.all_points_and_trials_kolling(628 preserve_all)629 return points, trials_needed630 else:631 raise ValueError('Unknown paradigm')632 return None, None633 def all_points_and_trials_small(self, preserve_all=False):634 """ Calculates all the possible points attainable during this task, and635 all the possible number of trials in which the agent could have gotten636 each of these points.637 The 'runs' in which the points go over the threshold are eliminated, so638 that the ouputs are not so big. This can be toggled on and off with the639 input preserve_all.640 Outputs:641 points Array with all the possible number of points642 that can be gained in this game.643 trials_needed Array that, for every element of 'points' (see644 above) has the minimum number of trials needed645 to attain them.646 """647 import itertools648 rL = self.rL649 rH = self.rH650 actPairs = np.unique(np.concatenate(([0], rL, rH)))651 numberActPairs = actPairs.size652 # All possible combinations of all actions:653 Vb = np.array(list(itertools.product(range(numberActPairs),654 repeat=self.nT)))655 # Number of trials needed for a given number of points:656 trialsNeeded = np.zeros(Vb.shape[0], dtype=int)657 # Calculate these points (save in V).658 for r, row in enumerate(Vb):659 trialsNeeded[r] = self.nT - Vb[r, Vb[r, :] == 0].size660 points = np.sum(Vb, axis=1) # Points gained661 if preserve_all is False:662 under_thres = points < self.thres663 points = points[under_thres]664 trialsNeeded = trialsNeeded[under_thres]665 else:666 # Eliminate those past nS667 under_nS = points < self.nS668 points = points[under_nS]669 trialsNeeded = trialsNeeded[under_nS]670# return points, trialsNeeded671 uniquePoints = np.unique(points)672 uniqueTrials = np.zeros(uniquePoints.shape, dtype=np.int64)673 for i, up in enumerate(uniquePoints):674 uniqueTrials[i] = min(trialsNeeded[points == up])675 return uniquePoints, uniqueTrials676 def all_points_and_trials_kolling(self, preserve_all=False):677 """ Calculates all the possible points attainable during this task, and678 all the possible number of trials in which the agent could have gotten679 each of these points.680 The 'runs' in which the points go over the threshold are eliminated, so681 that the ouputs are not so big. This can be toggled on and off with the682 input preserve_all.683 Outputs:684 points Array with all the possible number of points685 that can be gained in this game.686 trials_needed Array that, for every element of 'points' (see687 above) has the minimum number of trials needed688 to attain them.689 """690 import numpy as np691 import itertools692 rL = self.rL693 rH = self.rH694 nP = self.nP695 Vb = np.array(list(itertools.product(range(3), repeat=nP)))696 V = np.zeros(Vb.shape)697 points = np.zeros(Vb.shape[0], dtype=int)698 trials = np.zeros(Vb.shape[0], dtype=int)699 for r, row in enumerate(Vb):700 for t, val in enumerate(row):701 V[r, t] = (val == 1) * rL[t] + (val == 2) * rH[t]702 points[r] = V[r, :].sum()703 trials[r] = self.nT - Vb[r, Vb[r, :] == 0].size704 if preserve_all is False:705 under_thres = points < self.thres706 points = points[under_thres]707 trials = trials[under_thres]708 else:709 # Eliminate those past nS710 under_nS = points < self.nS711 points = points[under_nS]712 trials = trials[under_nS]713 uniquePoints = np.unique(points)714 uniqueTrials = np.zeros(uniquePoints.shape, dtype=int)715 for i, up in enumerate(uniquePoints):716 uniqueTrials[i] = min(trials[points == up])...

Full Screen

Full Screen

swear_handler.py

Source:swear_handler.py Github

copy

Full Screen

1import traceback2from inspect import iscoroutinefunction3from typing import Any, Callable, Optional, Tuple, Type, Union4from vkbottle.modules import logger5def swear(6 exception: Union[Exception, Type[Exception], Tuple[Union[Exception, Type[Exception]], ...]],7 exception_handler: Optional[Callable] = None,8 just_log: bool = False,9 just_return: bool = False,10) -> Any:11 """Swear catcher allows to handle exceptions | Used as a decorator12 :param exception: Exception(s) to handle13 :param exception_handler: async exception handler14 :param just_log: should swear handler log the error15 :param just_return: should the exception just be returned16 >>> @swear(RuntimeError, just_return=True)17 >>> def function():18 >>> raise RuntimeError("Oh no!")19 >>> function()20 >>> RuntimeError("Oh no!")21 """22 if not isinstance(exception, tuple):23 exception = (exception,)24 def decorator(func: Callable):25 async def asynchronous_wrapper(*args, **kwargs):26 try:27 return await func(*args, **kwargs)28 except exception as e:29 if exception_handler is not None:30 return await exception_handler(e, *args, **kwargs)31 elif just_log:32 logger.error(33 "{} (handling with swear) has thrown an exception: \n\n{}",34 func.__name__,35 traceback.format_exc(),36 )37 elif just_return:38 return e39 finally:40 logger.debug("Function {} was handled with swear", func.__name__)41 def synchronous_wrapper(*args, **kwargs):42 try:43 return func(*args, **kwargs)44 except exception as e:45 if exception_handler is not None:46 return exception_handler(e, *args, **kwargs)47 elif just_log:48 logger.error(49 "{} (handling with swear) has thrown an exception: \n\n{}",50 func.__name__,51 traceback.format_exc(),52 )53 elif just_return:54 return e55 finally:56 logger.debug("Function {} was handled with swear", func.__name__)57 if iscoroutinefunction(func) and iscoroutinefunction(exception_handler):58 return asynchronous_wrapper59 return synchronous_wrapper...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run refurb automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful