How to use _get_job_data method in avocado

Best Python code snippet using avocado_python

attribute.py

Source:attribute.py Github

copy

Full Screen

...10 self.import_attribute_groups_attribute_relation(cr, uid, job)11 return True12 def import_attribute_sets(self, cr, uid, job, context=None):13 set_obj = self.pool.get('product.attribute.set')14 records = self._get_job_data(cr, uid, job, 'catalog_product_attribute_set.list', [])15 for record in records:16 vals = set_obj.prepare_odoo_record_vals(cr, uid, job, record)17 result = set_obj.upsert_mage_record(cr, uid, vals)18 print result19 return True20 def import_attribute_groups(self, cr, uid, job, context=None):21 set_obj = self.pool.get('product.attribute.set')22 group_obj = self.pool.get('product.attribute.group')23 set_ids = set_obj.search(cr, uid, [])24 set_data = set_obj.read(cr, uid, set_ids, fields=['external_id'])25 sets = [x['external_id'] for x in set_data]26 records = self._get_job_data(cr, uid, job, 'oo_catalog_product_attribute_group.list', \27 [{'attribute_set_id': {'in': sets}}])28 for record in records:29 vals = group_obj.prepare_odoo_record_vals(cr, uid, job, record)30 result = group_obj.upsert_mage_record(cr, uid, vals)31 print result32 return True33 34 def import_attributes(self, cr, uid, job, context=None):35 attribute_obj = self.pool.get('product.attribute')36 records = self._get_job_data(cr, uid, job, 'oo_catalog_product_attribute.list', [])37 for record in records:38 if record.get('is_visible') == '0':39 continue40 vals = attribute_obj.prepare_odoo_record_vals(cr, uid, job, record)41 result = attribute_obj.upsert_mage_record(cr, uid, vals)42 return True43 44 def import_attribute_values(self, cr, uid, job, context=None):45 attr_obj = self.pool.get('product.attribute')46 value_obj = self.pool.get('product.attribute.value')47 attr_ids = attr_obj.search(cr, uid, [('is_user_defined', '=', True)])48 attr_data = attr_obj.read(cr, uid, attr_ids, fields=['external_id'])49 attributes = [x['external_id'] for x in attr_data]50 for attribute in attributes:51 records = self._get_job_data(cr, uid, job, 'oo_catalog_product_attribute.options', [attribute])52 for record in records:53 vals = value_obj.prepare_odoo_record_vals(cr, uid, job, record)54 value_obj.upsert_mage_record(cr, uid, vals)55 return True56 def process_mage_attribute_response(self, cr, uid, job, mappinglines, records):57 target_obj = self.pool.get(job.mapping.model_id.model)58 for record in records:59 #This needs to be fixed on Magento60 if record.get('is_visible') == '0':61 continue62 63 return True64 def find_attribute_group_relationship(self, cr, uid, attr_data):65 result = {}66 attr_obj = self.pool.get('product.attribute')67 group_obj = self.pool.get('product.attribute.group')68 attr_ids = attr_obj.search(cr, uid, [('external_id', '=', attr_data['attribute_id'])])69 #This needs to be fixed in Magento70 if not attr_ids:71 return False72 result['attribute'] = attr_ids[0]73 group_ids = group_obj.search(cr, uid, [('external_id', '=', attr_data['group_id'])])74 result['group'] = group_ids[0]75 return result76 def import_attribute_groups_attribute_relation(self, cr, uid, job, context=None):77 set_obj = self.pool.get('product.attribute.set')78 set_ids = set_obj.search(cr, uid, [])79 set_data = set_obj.read(cr, uid, set_ids, fields=['external_id'])80 sets = [x['external_id'] for x in set_data]81 group_obj = self.pool.get('product.attribute.group')82 buffer = {}83 for set in sets:84 records = self._get_job_data(cr, uid, job, 'oo_catalog_product_attribute.relations', [set])85 count = 086 for record in records:87 if not record['attribute_id']:88 count += 189 continue90 #This needs to be fixed in Magento91 dic = self.find_attribute_group_relationship(cr, uid, record)92 if not dic:93 continue94 if dic['group'] not in buffer.keys():95 buffer[dic['group']] = [dic['attribute']]96 else:97 buffer[dic['group']].append(dic['attribute'])98 for k, v in buffer.items():...

Full Screen

Full Screen

metadata.py

Source:metadata.py Github

copy

Full Screen

...15 except Exception, e:16 raise osv.except_osv(_('Metadata Sync Error'), _(str(e)))17 return True18 def sync_mage_carriers(self, cr, uid, job, context=None):19 records = self._get_job_data(cr, uid, job, 'sales_order.shipping_methods', [])20 carrier_obj = self.pool.get('delivery.carrier')21 for record in records:22 for method in record['methods']:23 vals = carrier_obj.prepare_odoo_record_vals(cr, uid, job, method)24 carrier = carrier_obj.upsert_mage_record(cr, uid, vals)25 print carrier26 return True27 def sync_all_taxes(self, cr, uid, job, context=None):28 records = self._get_job_data(cr, uid, job, 'sales_order.taxes_info', [])29 tax_obj = self.pool.get('account.tax')30 for tax in records:31 print tax32 tax_ids = tax_obj.search(cr, uid, [('name', '=', tax['code'])])33 if tax_ids:34 continue35 else:36 vals = {'name': tax['code'],37 'amount': float(tax['rate']) / 100,38 'mage_tax': True,39 'description': tax['code'],40 }41 tax_id = tax_obj.create(cr, uid, vals)42 return True43 def sync_instance_order_statuses(self, cr, uid, job, context=None):44 records = self._get_job_data(cr, uid, job, 'sales_order.get_order_states', [])45 mage_order_state_obj = self.pool.get('mage.mapping.order.state')46 for state, state_name in records.items():47 statuses = self._get_job_data(cr, uid, job, 'sales_order.get_order_statuses', [state])48 for k, v in statuses.items():49 print 'State', state50 print 'Status', k51 vals = {'mage_order_state': state,52 'mage_order_status': k,53 'mage_order_status_name': v,54 'name': v55 }56 existing_ids = mage_order_state_obj.search(cr, uid,57 [('mage_order_status', '=', k)])58 if not existing_ids:59 result = mage_order_state_obj.create(cr, uid, vals)60 print (True, result)61 else:62 mage_order_state_obj.write(cr, uid, existing_ids[0], vals)63 print (False, existing_ids[0])64 return True65 def import_websites(self, cr, uid, job, context=None):66 website_obj = self.pool.get('mage.website')67 records = self._get_job_data(cr, uid, job, 'oo_websites.list', [])68 for record in records:69 vals = website_obj.prepare_odoo_record_vals(cr, uid, job, record)70 result = website_obj.upsert_mage_record(cr, uid, vals)71 print result72 return True73 def import_store_groups(self, cr, uid, job, context=None):74 group_obj = self.pool.get('mage.store.group')75 records = self._get_job_data(cr, uid, job, 'oo_groups.list', [])76 for record in records:77 vals = group_obj.prepare_odoo_record_vals(cr, uid, job, record)78 result = group_obj.upsert_mage_record(cr, uid, vals)79 print result80 return True81 def import_store_views(self, cr, uid, job, context=None):82 storeview_obj = self.pool.get('mage.store.view')83 records = self._get_job_data(cr, uid, job, 'oo_storeviews.list', [])84 for record in records:85 vals = storeview_obj.prepare_odoo_record_vals(cr, uid, job, record)86 result = storeview_obj.upsert_mage_record(cr, uid, vals)87 print result88 return True89# def process_mage_websites_response(self, cr, uid, job, mappinglines, records):90# target_obj = self.pool.get(job.mapping.model_id.model)91# for record in records:92# vals = self._transform_record(cr, uid, job, record, 'from_mage_to_odoo', mappinglines)93# result = target_obj.upsert_mage_data(cr, uid, vals)...

Full Screen

Full Screen

hosts_veeam.py

Source:hosts_veeam.py Github

copy

Full Screen

...18 if result.status_code != 200:19 raise ValueError("Couldn't get a token, check your username or password.")20 # Return the token21 return result.json()["access_token"]22def _get_job_data(host, auth_token):23 # build the request24 api_path = '/api/v1/jobs'25 header = {'x-api-version': '1.0-rev1', "accept": "application/json", "Authorization": "Bearer " + auth_token}26 # Get the job data27 result = requests.get(url=host + api_path, headers=header, verify=False)28 # Check if we got a good result29 if result.status_code != 200:30 raise ValueError("Couldn't get the job data, status code: "+result.status_code())31 # Return the job data32 return result.json()["data"]33def get_hosts(host, username, password):34 try:35 # Get access token36 token = _get_auth_token(host, username, password)37 # Get job data38 data = _get_job_data(host, token)39 except ValueError as e:40 sys.exit(e)41 # Extract the Hostnames42 job_hosts = []43 for jobs in data:44 for obj in jobs["virtualMachines"]["includes"]:45 job_hosts.append(obj["inventoryObject"]["name"])46 # Return the Hostnames...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful