How to use get_value_by_index method in Slash

Best Python code snippet using slash

test_google_bigquery_origin.py

Source:test_google_bigquery_origin.py Github

copy

Full Screen

...114 # Start pipeline and verify correct rows are received.115 logger.info('Starting pipeline')116 sdc_executor.start_pipeline(pipeline).wait_for_finished()117 get_value_by_index = lambda x: list(x[0].field.values())[x[1]].value118 rows_from_wiretap = [(int(get_value_by_index((record, 0)))119 if get_value_by_index((record, 0)) is not None else None,120 get_value_by_index((record, 1)),121 float(get_value_by_index((record, 2)))122 if get_value_by_index((record, 2)) is not None else None,123 str(get_value_by_index((record, 3)))124 if get_value_by_index((record, 3)) is not None else None,125 get_value_by_index((record, 4)),126 get_value_by_index((record, 5)).strftime("%Y-%m-%d")127 if get_value_by_index((record, 5)) is not None else None,128 get_value_by_index((record, 6)).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]129 if get_value_by_index((record, 6)) is not None else None,130 '{} UTC'.format(get_value_by_index((record, 7)).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])131 if get_value_by_index((record, 7)) is not None else None,132 get_value_by_index((record, 8)).strftime('%H:%M:%S.%f')[:-3]133 if get_value_by_index((record, 8)) is not None else None,134 get_value_by_index((record, 9)) if get_value_by_index((record, 9)) is not None else None)135 for record in wiretap.output_records]136 assert rows_from_wiretap == ROWS_TO_INSERT137 finally:138 bigquery_client.delete_dataset(dataset_ref, delete_contents=True)139# SDC-10836. Google BigQuery is not respecting batchSize and at the same time is duplicating records140@gcp141@sdc_min_version('2.7.0.0')142def test_google_bigquery_origin_batch_handling(sdc_builder, sdc_executor, gcp):143 """Verify proper batch handling by the BigQuery origin.144 In this test, we write 8 records to BigQuery with a batch size of 3, verifying that each batch is of145 size 3 and that 8 total records are captured. The small numbers are used because of the limitations146 described in SDC-8765.147 """148 MAX_BATCH_SIZE = 3...

Full Screen

Full Screen

database_handler.py

Source:database_handler.py Github

copy

Full Screen

1from alchemy import *2import alchemy3from sqlalchemy.orm import sessionmaker4Session = sessionmaker(bind=engine)5def get_value_by_index(data,index):6 try:7 val = str(data.get(index))8 except Exception as e:9 val = None10 finally:11 return val12loginInfo = {13 "clientId": None,14 "serverRef": None,15 "privateKey": None,16 "publicKey": None,17 "key": {18 "encKey": None,19 "macKey": None20 }21 };22connInfo = {23 "clientToken": None,24 "serverToken": None,25 "browserToken": None,26 "secret": None,27 "sharedSecret": None,28 "me": None29 };30def insert_user(obj,wid,login_info,conn_info):31 data = obj[1]32 lg = get_value_by_index(data,"lg")33 server_token = get_value_by_index(data,"serverToken")34 lc = get_value_by_index(data,"lc")35 ref = get_value_by_index(data,"ref")36 platform = get_value_by_index(data,"platform")37 battery = int(get_value_by_index(data,"battery"))38 tos = int(get_value_by_index(data,"tos"))39 secret = get_value_by_index(data,"secret")40 pushname = get_value_by_index(data,"pushname")41 plugged = bool(get_value_by_index(data,"plugged"))42 proto_version = get_value_by_index(data,"protoVersion")43 connected = bool(get_value_by_index(data,"connected"))44 browser_token =get_value_by_index(data,"browserToken")45 client_token = get_value_by_index(data,"clientToken")46 bin_version = int(get_value_by_index(data,"binVersion"))47 is_response = bool(get_value_by_index(data,"isResponse"))48 49 phone_device_model = get_value_by_index(data["phone"],"device_model")50 phone_wa_version = get_value_by_index(data["phone"],"wa_version")51 phone_mcc = int(get_value_by_index(data["phone"],"mcc"))52 phone_os_version = get_value_by_index(data["phone"],"os_version")53 phone_os_build_number =get_value_by_index(data["phone"],"os_build_number")54 phone_device_manufacturer = get_value_by_index(data["phone"],"device_manufacturer")55 phone_mnc = int(get_value_by_index(data["phone"],"mnc"))56 57 features_URL = get_value_by_index(data["features"],"URL")58 features_FLAGS = get_value_by_index(data["features"],"FLAGS")59 login_info_clientId = get_value_by_index(login_info,"clientId")60 login_info_serverRef = get_value_by_index(login_info,"serverRef")61 login_info_privateKey = get_value_by_index(login_info,"privateKey")62 login_info_encKey = get_value_by_index(login_info['key'],"encKey")63 login_info_macKey = get_value_by_index(login_info['key'],"macKey")64 conn_info_sharedSecret = get_value_by_index(conn_info,"sharedSecret")65 user = User_detail( wid , lg , server_token , lc , ref , platform , battery , tos , secret , pushname , plugged , proto_version , connected , browser_token , client_token , bin_version , is_response , phone_device_model , phone_wa_version , phone_mcc , phone_os_version , phone_os_build_number , phone_device_manufacturer , phone_mnc ,features_URL , features_FLAGS ,login_info_clientId , login_info_serverRef , login_info_privateKey ,login_info_encKey ,login_info_macKey , conn_info_sharedSecret)66 print(user)67 session = Session()68 session.add(user)69 session.commit()70def insert_contact(obj,wid):71 # contacts = obj[2]72 contacts = obj73 session = Session() 74 for contact in contacts:75 contact = contact[1]76 jid = get_value_by_index(contact,"jid")77 notify = get_value_by_index(contact,"notify")78 name = get_value_by_index(contact,"name")79 short = get_value_by_index(contact,"short")80 vname = get_value_by_index(contact,"vname")81 verify = bool(get_value_by_index(contact,"verify"))82 enterprise = bool(get_value_by_index(contact,"enterprise"))83 con = alchemy.contact(wid,jid,notify,name,short,vname,verify,enterprise)84 session.add(con)85 session.commit()86def insert_chat(obj,wid):87 chats = obj[2]88 session = Session() 89 for contact in chats:90 contact = contact[1]91 jid = get_value_by_index(contact,"jid")92 old_jid = get_value_by_index(contact,"old_jid")93 count = get_value_by_index(contact,"count")94 mute = get_value_by_index(contact,"mute")95 spam = bool(get_value_by_index(contact,"spam"))96 t = get_value_by_index(contact,"t")97 message = bool(get_value_by_index(contact,"message"))98 modify_tag = get_value_by_index(contact,"modify_tag")99 name = get_value_by_index(contact,"name")100 chat = alchemy.chat(wid,jid,name,old_jid,count,mute,spam,t,message,modify_tag)101 session.add(chat)102 session.commit()103def insert_message(obj,wid):104 pass105def decode_message_media(message):106 pass...

Full Screen

Full Screen

stack.py

Source:stack.py Github

copy

Full Screen

...50 else:51 stack.append(hold.pop())52 return stack53# Get value by the index of the stack, O(n)54def get_value_by_index(stack, idx):55 range__ = len(stack)56 value = 057 hold = []58 for i in range(range__):59 item = stack.pop()60 if i == range__-1-idx:61 value = item62 hold.append(item)63 for i in range(range__): # reverse back stack64 stack.append(hold.pop())65 return value66# copy one stack to another67def copy(stack1, stack2):68 hold = []69 range2__ = len(stack2)70 range1__ = len(stack1)71 for i in range(range1__):72 stack1.pop()73 for i in range(range2__):74 stack1.append(stack2.pop())75# Selection Sort76# at the end of the stack. O(n^2) but due to stack indexing O(n^3)77def stack_selection_sort(stack):78 lowest = 0;79 range__ = len(stack)80 for i in range(range__):81 lowest = get_value_by_index(stack, i)82 lowest_i = i83 for j in range(range__-i):84 if get_value_by_index(stack, j+i) < lowest:85 lowest = get_value_by_index(stack, j+i)86 lowest_i = j+i87 swap(stack, lowest_i, i)88 return stack89# collections deque90#thread safe, memory efficient, pop O(1) efficency91def basic_collections_stack():92 myStack = deque(range(10))93 myStack.append(99)94 print(f'myStack: {myStack}')95 myStack.pop()96 print(f'myStack: {myStack}')97 myStack.popleft()98 print(f'myStack: {myStack}')99# leetcode 155 min stack...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful