Best Python code snippet using tempest_python
test_portal_api.py
Source:test_portal_api.py  
1import requests2import json3import unittest4import warnings5from test_partal_data.ProjectData import *6from test_partal_data.getHeaders import Get_headers7from test_partal_data.getUrl import GetUrl8from test_partal_data.method import Requests9import time10import pytest11headers = Get_headers()12url = GetUrl()13json = Publish_Data()14class Test_House():15    '''æ¿å±å»ºç'''16    obj = Requests()17    warnings.simplefilter('ignore', ResourceWarning)18    def get(self, ID):19        with open('ID', "w") as f:20            f.write(str(ID))21    @property22    def read(self):23        '''è·åid'''24        with open('ID', 'r') as f:25            return f.read()26    def test_publish_001(self):27        '''åå¸ç¼è¾é¡¹ç®ä¿¡æ¯'''28        r = self.obj.post(url=url.getUrl_001, json=json.building_001, headers=headers.getheaders)29        print(r.json())30        if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:31            assert r.json()['code'] == str(0)32        else:33            self.get(r.json()['data'])34    def test_publish_002(self):35        '''åå¸ç¼è¾å让信æ¯'''36        r = self.obj.post(url=url.getUrl_002, json=json.building_002, headers=headers.getheaders)37        assert r.json()['code'] == str(0)38        assert r.json()['data'] == int(self.read)39    def test_publish_003(self):40        '''ä¸ä¼ å¾ç'''41        for i in range(10):42            r = self.obj.post(url=url.getUrl_003, files=json.building_file, headers=headers.getheaders_002,43                              json=json.building_003)44            if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:45                assert r.json()['code'] == str(0)46            else:47                self.save_photo(r.json()['data']['fullPath'])48    def save_photo(self, photo):49        '''ä¿åå¾çå°å'''50        with open('photo', 'a') as f:51            f.write(str(photo) + '\n')52    def test_publish_004(self):53        '''åå¸ä¸ä¼ éä»¶ä¿¡æ¯'''54        r = self.obj.post(url=url.getUrl_004, json=json_clf.building_004, headers=headers.getheaders)55        if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:56            assert r.json()['code'] == str(0)57    def test_publish_005(self):58        '''确认æ¿è¯ºå½'''59        r = self.obj.post(url=url.getUrl_005, json=json.building_005, headers=headers.getheaders)60        assert r.json()['code'] == str(0)61    def test_publish_006(self):62        '''æ¸
空photo'''63        with open('photo', 'r+', encoding='utf-8', ) as f:64            f.truncate(0)65json_Cml= Publish_Cml()66class Test_Cml():67    '''é使ºå¨å°'''68    obj = Requests()69    warnings.simplefilter('ignore', ResourceWarning)70    def get(self,ID):71        with open('ID', "w") as f:72            f.write(str(ID))73    @property74    def read(self):75        '''è·åid'''76        with open('ID', 'r') as f:77            return f.read()78    def test_Cml_001(self):79        '''åå¸ç¼è¾é¡¹ç®ä¿¡æ¯'''80        r=self.obj.post(url=url.getUrl_001, json=json_Cml.building_001, headers=headers.getheaders)81        print(r.json())82        if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:83            assert r.json()['code'] == str(0)84        else:85            self.get(r.json()['data'])86    def test_Cml_002(self):87        '''åå¸ç¼è¾å让信æ¯'''88        r = self.obj.post(url=url.getUrl_002, json=json_Cml.building_002, headers=headers.getheaders)89        assert r.json()['code'] == str(0)90        assert r.json()['data'] == int(self.read)91    def test_Cml_003(self):92        '''ä¸ä¼ å¾ç'''93        for i in range(10):94            r = self.obj.post(url=url.getUrl_003, files=json_Cml.building_file, headers=headers.getheaders_002,json=json.building_003)95            if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:96                assert r.json()['code'] == str(0)97            else:98                self.save_photo(r.json()['data']['fullPath'])99            #time.sleep(2)100    def save_photo(self,photo):101        '''ä¿åå¾çå°å'''102        with open('photo', 'a') as f:103            f.write(str(photo)+'\n')104    def test_Cml_004(self):105        '''åå¸ä¸ä¼ éä»¶ä¿¡æ¯'''106        r = self.obj.post(url=url.getUrl_004,data=json_clf.building_004,headers=headers.getheaders)107        #print(json_Cml.building_004)108        #print(type(json_Cml.building_004))109        print(r.json())110        if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:111            assert r.json()['code'] == str(0)112    def test_Cml_005(self):113        '''确认æ¿è¯ºå½'''114        r=self.obj.post(url=url.getUrl_005,json=json_Cml.building_005,headers=headers.getheaders)115        assert r.json()['code'] == str(0)116    # def test_Cml_006(self):117    #     '''æ¸
空photo'''118    #     with open('photo', 'r+', encoding='utf-8', ) as f:119    #         f.truncate(0)120json_clf = Publish_Clf()121class Test_Clf():122    '''åæ·æ¿å
å°'''123    obj = Requests()124    warnings.simplefilter('ignore', ResourceWarning)125    def get(self,ID):126        with open('ID', "w") as f:127            f.write(str(ID))128    @property129    def read(self):130        '''è·åid'''131        with open('ID', 'r') as f:132            return f.read()133    def test_clf_001(self):134        '''åå¸ç¼è¾é¡¹ç®ä¿¡æ¯'''135        r=self.obj.post(url=url.getUrl_001, json=json_clf.clf_001, headers=headers.getheaders)136        if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:137            assert r.json()['code'] == str(0)138        else:139            self.get(r.json()['data'])140    def test_clf_002(self):141        '''åå¸ç¼è¾å让信æ¯'''142        r = self.obj.post(url=url.getUrl_002, json=json_clf.clf_002, headers=headers.getheaders)143        assert r.json()['code'] == str(0)144        assert r.json()['data'] == int(self.read)145    def test_clf_003(self):146        '''ä¸ä¼ å¾ç'''147        for i in range(10):148            r = self.obj.post(url=url.getUrl_003, files=json_clf.building_file, headers=headers.getheaders_002,json=json_clf.clf_003)149            if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:150                assert r.json()['code'] == str(0)151            else:152                self.save_photo(r.json()['data']['fullPath'])153    def save_photo(self,photo):154        '''ä¿åå¾çå°å'''155        with open('photo', 'a') as f:156            f.write(str(photo)+'\n')157    def test_clf_004(self):158        '''åå¸ä¸ä¼ éä»¶ä¿¡æ¯'''159        r = self.obj.post(url=url.getUrl_004,json=json_clf.clf_004,headers=headers.getheaders)160        print(r.json())161        if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:162            assert r.json()['code'] == str(0)163    def test_clf_005(self):164        '''确认æ¿è¯ºå½'''165        r=self.obj.post(url=url.getUrl_005,json=json_clf.clf_005,headers=headers.getheaders)166        assert r.json()['code'] == str(0)167    def test_clf_006(self):168        '''æ¸
空photo'''169        with open('photo', 'r+', encoding='utf-8', ) as f:170            f.truncate(0)171json_wd = Publish_Woodland()172class Test_Wd():173    '''æå°'''174    obj = Requests()175    warnings.simplefilter('ignore', ResourceWarning)176    def get(self,ID):177        with open('ID', "w") as f:178            f.write(str(ID))179    @property180    def read(self):181        '''è·åid'''182        with open('ID', 'r') as f:183            return f.read()184    def test_wd_001(self):185        '''åå¸ç¼è¾é¡¹ç®ä¿¡æ¯'''186        r=self.obj.post(url=url.getUrl_001, json=json_wd.building_001, headers=headers.getheaders)187        print(r.json())188        if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:189            assert r.json()['code'] == str(0)190        else:191            self.get(r.json()['data'])192    def test_wd_002(self):193        '''åå¸ç¼è¾å让信æ¯'''194        r = self.obj.post(url=url.getUrl_002, json=json_wd.building_002, headers=headers.getheaders)195        assert r.json()['code'] == str(0)196        assert r.json()['data'] == int(self.read)197    def test_wd_003(self):198        '''ä¸ä¼ å¾ç'''199        for i in range(10):200            r = self.obj.post(url=url.getUrl_003, files=json_wd.building_file, headers=headers.getheaders_002,json=json.building_003)201            print(r.json())202            if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:203                assert r.json()['code'] == str(0)204            else:205                self.save_photo(r.json()['data']['fullPath'])206    def save_photo(self,photo):207        '''ä¿åå¾çå°å'''208        with open('photo', 'a') as f:209            f.write(str(photo)+'\n')210    def test_wd_004(self):211        '''åå¸ä¸ä¼ éä»¶ä¿¡æ¯'''212        r = self.obj.post(url=url.getUrl_004,json=json_clf.building_004,headers=headers.getheaders)213        print(r.json())214        if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:215            assert r.json()['code'] == str(0)216    def test_wd_005(self):217        '''确认æ¿è¯ºå½'''218        r=self.obj.post(url=url.getUrl_005,json=json_wd.building_005,headers=headers.getheaders)219        assert r.json()['code'] == str(0)220    def test_publish_006(self):221        '''æ¸
空photo'''222        with open('photo', 'r+', encoding='utf-8', ) as f:223            f.truncate(0)224json_wr= Publish_water()225class Test_Wr():226    '''æ°´é¢'''227    obj = Requests()228    warnings.simplefilter('ignore', ResourceWarning)229    def get(self,ID):230        with open('ID', "w") as f:231            f.write(str(ID))232    @property233    def read(self):234        '''è·åid'''235        with open('ID', 'r') as f:236            return f.read()237    def test_wr_001(self):238        '''åå¸ç¼è¾é¡¹ç®ä¿¡æ¯'''239        r=self.obj.post(url=url.getUrl_001, json=json_wr.building_001, headers=headers.getheaders)240        print(r.json())241        if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:242            assert r.json()['code'] == str(0)243        else:244            self.get(r.json()['data'])245    def test_wr_002(self):246        '''åå¸ç¼è¾å让信æ¯'''247        r = self.obj.post(url=url.getUrl_002, json=json_wr.building_002, headers=headers.getheaders)248        assert r.json()['code'] == str(0)249        assert r.json()['data'] == int(self.read)250    def test_wr_003(self):251        '''ä¸ä¼ å¾ç'''252        for i in range(10):253            r = self.obj.post(url=url.getUrl_003, files=json_wr.building_file, headers=headers.getheaders_002,json=json.building_003)254            if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:255                assert r.json()['code'] == str(0)256            else:257                self.save_photo(r.json()['data']['fullPath'])258    def save_photo(self,photo):259        '''ä¿åå¾çå°å'''260        with open('photo', 'a') as f:261            f.write(str(photo)+'\n')262    def test_wr_004(self):263        '''åå¸ä¸ä¼ éä»¶ä¿¡æ¯'''264        r = self.obj.post(url=url.getUrl_004,json=json_clf.building_004,headers=headers.getheaders)265        if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:266            assert r.json()['code'] == str(0)267    def test_wr_005(self):268        '''确认æ¿è¯ºå½'''269        r=self.obj.post(url=url.getUrl_005,json=json_wr.building_005,headers=headers.getheaders)270        assert r.json()['code'] == str(0)271    def test_wr_006(self):272        '''æ¸
空photo'''273        with open('photo', 'r+', encoding='utf-8', ) as f:274            f.truncate(0)275json_Em= Publish_Equipment()276class Test_Em():277    '''设å¤'''278    obj = Requests()279    warnings.simplefilter('ignore', ResourceWarning)280    def get(self,ID):281        with open('ID', "w") as f:282            f.write(str(ID))283    @property284    def read(self):285        '''è·åid'''286        with open('ID', 'r') as f:287            return f.read()288    def test_Em_001(self):289        '''åå¸ç¼è¾é¡¹ç®ä¿¡æ¯'''290        r=self.obj.post(url=url.getUrl_001, json=json_Em.building_001, headers=headers.getheaders)291        print(r.json())292        if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:293            assert r.json()['code'] == str(0)294        else:295            self.get(r.json()['data'])296    def test_Em_002(self):297        '''åå¸ç¼è¾å让信æ¯'''298        r = self.obj.post(url=url.getUrl_002, json=json_Em.building_002, headers=headers.getheaders)299        assert r.json()['code']==str(0)300        assert r.json()['data'] == int(self.read)301        #assert self.excel.get_Expect(row=row) in json.dumps(r.json(), ensure_ascii=False)302    def test_Em_003(self):303        '''ä¸ä¼ å¾ç'''304        for i in range(10):305            r = self.obj.post(url=url.getUrl_003, files=json_Em.building_file, headers=headers.getheaders_002,json=json.building_003)306            if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:307                assert r.json()['code']==str(0)308            else:309                self.save_photo(r.json()['data']['fullPath'])310    def save_photo(self,photo):311        '''ä¿åå¾çå°å'''312        with open('photo', 'a') as f:313            f.write(str(photo)+'\n')314    def test_Em_004(self):315        '''åå¸ä¸ä¼ éä»¶ä¿¡æ¯'''316        r = self.obj.post(url=url.getUrl_004,json=json_clf.building_004,headers=headers.getheaders)317        if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:318            assert r.json()['code']==str(0)319    def test_Em_005(self):320        '''确认æ¿è¯ºå½'''321        r=self.obj.post(url=url.getUrl_005,json=json_Em.building_005,headers=headers.getheaders)322        assert r.json()['code']==str(0)323    def test_Em_006(self):324        '''æ¸
空photo'''325        with open('photo', 'r+', encoding='utf-8', ) as f:326            f.truncate(0)327if __name__ == '__main__':328    while True:329        try:330            i = int(input('请è¾å
¥éè¦å建ç项ç®ç±»åï¼\n 1.æ¿å±å»ºçç© 2.é使ºå¨å°\n 3.åæ·æ¿å
å° 4.æå°ææ¨\n 5.æ°´é¢ 6.设å¤è®¾æ½\n 7.éåº\n'))331            if i == 1:332                pytest.main(['-s', '-v', 'test_portal_api.py::Test_House'])333                break334            elif i == 2:335                pytest.main(['-s', '-v', 'test_portal_api.py::Test_Cml'])336                break337            elif i == 3:338                pytest.main(['-s', '-v', 'test_portal_api.py::Test_Clf'])339                break340            elif i == 4:341                pytest.main(['-s', '-v', 'test_portal_api.py::Test_Wd'])342                break343            elif i == 5:344                pytest.main(['-s', '-v', 'test_portal_api.py::Test_Wr'])345                break346            elif i == 6:347                pytest.main(['-s', '-v', 'test_portal_api.py::Test_Em'])348                break349            elif i == 7:350                break351            else:352                print('è¾å
¥æ£ç¡®çæ°å')353        except Exception as f:...api.py
Source:api.py  
1import requests2def getHeaders():3    headers = {4            'Content-Type': 'application/json',5            'Accept': 'application/json'6        }7    return headers8def getToken(payload):9    response = requests.post('https://app.skuvault.com/api/gettokens', headers=getHeaders(), json=payload)10    return response.json()11def addItem(payload):12    response = requests.post('https://app.skuvault.com/api/inventory/addItem', headers=getHeaders(), json=payload)13    return response.json()14def addItemBulk(payload):15    response = requests.post('https://app.skuvault.com/api/inventory/addItemBulk', headers=getHeaders(), json=payload)16    return response.json()17def addShipments(payload):18    response = requests.post('https://app.skuvault.com/api/sales/addShipments', headers=getHeaders(), json=payload)19    return response.json()20def createBrands(payload):21    response = requests.post('https://app.skuvault.com/api/products/createBrands', headers=getHeaders(), json=payload)22    return response.json()23def createHolds(payload):24    response = requests.post('https://app.skuvault.com/api/sales/createHolds', headers=getHeaders(), json=payload)25    return response.json()26def createKit(payload):27    response = requests.post('https://app.skuvault.com/api/products/createKit', headers=getHeaders(), json=payload)28    return response.json()29def createPO(payload):30    response = requests.post('https://app.skuvault.com/api/purchaseorders/createPO', headers=getHeaders(), json=payload)31    return response.json()32def createProduct(payload):33    response = requests.post('https://app.skuvault.com/api/products/createProduct', headers=getHeaders(), json=payload)34    return response.json()35def createProducts(payload):36    response = requests.post('https://app.skuvault.com/api/products/createProducts', headers=getHeaders(), json=payload)37    return response.json()38def createSuppliers(payload):39    response = requests.post('https://app.skuvault.com/api/products/createSuppliers', headers=getHeaders(), json=payload)40    return response.json()41def getAvailableQuantities(payload):42    response = requests.post('https://app.skuvault.com/api/inventory/getAvailableQuantities', headers=getHeaders(), json=payload)43    return response.json()44def getBrands(payload):45    response = requests.post('https://app.skuvault.com/api/products/getBrands', headers=getHeaders(), json=payload)46    return response.json()47def getClassifications(payload):48    response = requests.post('https://app.skuvault.com/api/products/getClassifications', headers=getHeaders(), json=payload)49    return response.json()50def getExternalWarehouseQuantities(payload):51    response = requests.post('https://app.skuvault.com/api/inventory/getExternalWarehouseQuantities', headers=getHeaders(), json=payload)52    return response.json()53def getExternalWarehouses(payload):54    response = requests.post('https://app.skuvault.com/api/inventory/getExternalWarehouses', headers=getHeaders(), json=payload)55    return response.json()56def getHandlingTime(payload):57    response = requests.post('https://app.skuvault.com/api/products/getHandlingTime', headers=getHeaders(), json=payload)58    return response.json()59def getIncomingItems(payload):60    response = requests.post('https://app.skuvault.com/api/purchaseorders/getIncomingItems', headers=getHeaders(), json=payload)61    return response.json()62def getIntegrations(payload):63    response = requests.post('https://app.skuvault.com/api/integration/getIntegrations', headers=getHeaders(), json=payload)64    return response.json()65def getInventoryByLocation(payload):66    response = requests.post('https://app.skuvault.com/api/inventory/getInventoryByLocation', headers=getHeaders(), json=payload)67    return response.json()68def getItemQuantities(payload):69    response = requests.post('https://app.skuvault.com/api/inventory/getItemQuantities', headers=getHeaders(), json=payload)70    return response.json()71def getKitQuantities(payload):72    response = requests.post('https://app.skuvault.com/api/inventory/getKitQuantities', headers=getHeaders(), json=payload)73    return response.json()74def getKits(payload):75    response = requests.post('https://app.skuvault.com/api/products/getKits', headers=getHeaders(), json=payload)76    return response.json()77def getLocations(payload):78    response = requests.post('https://app.skuvault.com/api/inventory/getLocations', headers=getHeaders(), json=payload)79    return response.json()80def getOnlineSaleStatus(payload):81    response = requests.post('https://app.skuvault.com/api/sales/getOnlineSaleStatus', headers=getHeaders(), json=payload)82    return response.json()83def getPOs(payload):84    response = requests.post('https://app.skuvault.com/api/purchaseorders/getPOs', headers=getHeaders(), json=payload)85    return response.json()86def getProduct(payload):87    response = requests.post('https://app.skuvault.com/api/products/getProduct', headers=getHeaders(), json=payload)88    return response.json()89def getProducts(payload):90    response = requests.post('https://app.skuvault.com/api/products/getProducts', headers=getHeaders(), json=payload)91    return response.json()92def getSerialNumbers(payload):93    response = requests.post('https://app.skuvault.com/api/inventory/getSerialNumbers', headers=getHeaders(), json=payload)94    return response.json()95def getReceivesHistory(payload):96    response = requests.post('https://app.skuvault.com/api/purchaseorders/getReceivesHistory', headers=getHeaders(), json=payload)97    return response.json()98def getSaleItemCost(payload):99    response = requests.post('https://app.skuvault.com/api/sales/getSaleItemCost', headers=getHeaders(), json=payload)100    return response.json()101def getSales(payload):102    response = requests.post('https://app.skuvault.com/api/sales/getSales', headers=getHeaders(), json=payload)103    return response.json()104def getSalesByDate(payload):105    response = requests.post('https://app.skuvault.com/api/sales/getSalesByDate', headers=getHeaders(), json=payload)106    return response.json()107def getShipments(payload):108    response = requests.post('https://app.skuvault.com/api/sales/getShipments', headers=getHeaders(), json=payload)109    return response.json()110def getSuppliers(payload):111    response = requests.post('https://app.skuvault.com/api/products/getSuppliers', headers=getHeaders(), json=payload)112    return response.json()113def getTransactions(payload):114    response = requests.post('https://app.skuvault.com/api/inventory/getTransactions', headers=getHeaders(), json=payload)115    return response.json()116def getWarehouseItemQuantities(payload):117    response = requests.post('https://app.skuvault.com/api/inventory/getWarehouseItemQuantities', headers=getHeaders(), json=payload)118    return response.json()119def getWarehouseItemQuantity(payload):120    response = requests.post('https://app.skuvault.com/api/inventory/getWarehouseItemQuantity', headers=getHeaders(), json=payload)121    return response.json()122def getWarehouses(payload):123    response = requests.post('https://app.skuvault.com/api/inventory/getWarehouses', headers=getHeaders(), json=payload)124    return response.json()125def pickItem(payload):126    response = requests.post('https://app.skuvault.com/api/inventory/pickItem', headers=getHeaders(), json=payload)127    return response.json()128def pickItemBulk(payload):129    response = requests.post('https://app.skuvault.com/api/inventory/pickItemBulk', headers=getHeaders(), json=payload)130    return response.json()131def receivePOItems(payload):132    response = requests.post('https://app.skuvault.com/api/purchaseorders/receivePOItems', headers=getHeaders(), json=payload)133    return response.json()134def releaseHeldQuantities(payload):135    response = requests.post('https://app.skuvault.com/api/sales/releaseHeldQuantities', headers=getHeaders(), json=payload)136    return response.json()137def removeItem(payload):138    response = requests.post('https://app.skuvault.com/api/inventory/removeItem', headers=getHeaders(), json=payload)139    return response.json()140def removeItemBulk(payload):141    response = requests.post('https://app.skuvault.com/api/inventory/removeItemBulk', headers=getHeaders(), json=payload)142    return response.json()143def setItemQuantities(payload):144    response = requests.post('https://app.skuvault.com/api/inventory/setItemQuantities', headers=getHeaders(), json=payload)145    return response.json()146def setItemQuantity(payload):147    response = requests.post('https://app.skuvault.com/api/inventory/setItemQuantity', headers=getHeaders(), json=payload)148    return response.json()149def setShipmentFile(payload):150    response = requests.post('https://app.skuvault.com/api/sales/setShipmentFile', headers=getHeaders(), json=payload)151    return response.json()152def syncOnlineSale(payload):153    response = requests.post('https://app.skuvault.com/api/sales/syncOnlineSale', headers=getHeaders(), json=payload)154    return response.json()155def syncOnlineSales(payload):156    response = requests.post('https://app.skuvault.com/api/sales/syncOnlineSales', headers=getHeaders(), json=payload)157    return response.json()158def syncShippedSaleAndRemoveItems(payload):159    response = requests.post('https://app.skuvault.com/api/sales/syncShippedSaleAndRemoveItems', headers=getHeaders(), json=payload)160    return response.json()161def syncShippedSaleAndRemoveItemsBulk(payload):162    response = requests.post('https://app.skuvault.com/api/sales/syncShippedSaleAndRemoveItems/bulk', headers=getHeaders(), json=payload)163    return response.json()164def updateAltSKUsCodes(payload):165    response = requests.post('https://app.skuvault.com/api/products/updateAltSKUsCodes', headers=getHeaders(), json=payload)166    return response.json()167def updateExternalWarehouseQuantities(payload):168    response = requests.post('https://app.skuvault.com/api/inventory/updateExternalWarehouseQuantities', headers=getHeaders(), json=payload)169    return response.json()170def updateHandlingTime(payload):171    response = requests.post('https://app.skuvault.com/api/products/updateHandlingTime', headers=getHeaders(), json=payload)172    return response.json()173def updateOnlineSaleStatus(payload):174    response = requests.post('https://app.skuvault.com/api/sales/updateOnlineSaleStatus', headers=getHeaders(), json=payload)175    return response.json()176def updatePOs(payload):177    response = requests.post('https://app.skuvault.com/api/purchaseorders/updatePOs', headers=getHeaders(), json=payload)178    return response.json()179def updateProduct(payload):180    response = requests.post('https://app.skuvault.com/api/products/updateProduct', headers=getHeaders(), json=payload)181    return response.json()182def updateProducts(payload):183    response = requests.post('https://app.skuvault.com/api/products/updateProducts', headers=getHeaders(), json=payload)184    return response.json()185def updateShipments(payload):186    response = requests.post('https://app.skuvault.com/api/sales/updateShipments', headers=getHeaders(), json=payload)...BinStream.py
Source:BinStream.py  
1#-*- coding: utf-82import numpy as np3import struct4from vis_brain.streams.Stream import Stream5_STREAM_CHUNK = "#!vis-brain.data.stream"6_STREAM_CHUNK_SIZE = 2567class BinStream(Stream):8    '''9    Represents so called .bin stream. The .bin stream deals with native vis-brain .bin's streams. All routines10    gave already been declared in the Stream class. This class declares their implementations11    '''12    __handle = None13    def _openForReading(self):14        global _STREAM_CHUNK, _STREAM_CHUNK_SIZE15        self.__handle = open(self._filename, "rb")16        try:17            raw_chunk = self.__handle.read(_STREAM_CHUNK_SIZE)18            chunk = raw_chunk.decode("utf-8").rstrip("\0")19            if chunk != _STREAM_CHUNK:20                raise IncorrectFileFormat()21            raw_header = self.__handle.read(12)22            header = struct.unpack("iii", raw_header)23            self.getHeaders()['height'] = header[0]24            self.getHeaders()['width'] = header[1]25            self.getHeaders()['nframes'] = header[2]26            raw_header = self.__handle.read(24)27            header = struct.unpack("ddd", raw_header)28            self.getHeaders()['height_um'] = header[0]29            self.getHeaders()['width_um'] = header[1]30            self.getHeaders()['sample_rate'] = header[2]31        except Exception as exc:32            self.__handle.close()33            raise exc34    def _openForWriting(self):35        global _STREAM_CHUNK, _STREAM_CHUNK_SIZE36        self.__handle = open(self._filename, "wb")37        try:38            raw_chunk = _STREAM_CHUNK.encode("utf-8")39            raw_chunk += b"\x00" * (_STREAM_CHUNK_SIZE - len(raw_chunk))40            self.__handle.write(raw_chunk)41            integer_header = struct.pack("iii",42                                         self.getHeaders()['height'],43                                         self.getHeaders()['width'],44                                         self.getHeaders()['nframes'])45            float_header = struct.pack("ddd",46                                       self.getHeaders()["height_um"],47                                       self.getHeaders()["width_um"],48                                       self.getHeaders()["sample_rate"])49            self.__handle.write(integer_header)50            self.__handle.write(float_header)51        except Exception as exc:52            self.__handle.close()53            raise exc54    def _read(self):55        height = self.getHeaders()['height']56        width = self.getHeaders()['width']57        size = height * width58        frame_size = size * 859        raw_data = self.__handle.read(frame_size)60        data = struct.unpack("d"*size, raw_data)61        raw_matrix = np.array(data)62        matrix = np.reshape(raw_matrix, (height, width))63        return matrix64    def _move(self, n):65        height = self.getHeaders()['height']66        width = self.getHeaders()['width']67        frameSize = width * height * 868        self.__handle.seek(frameSize * n, 1)69    def _first(self):70        self.__handle.seek(292, 0)71    def _last(self):72        height = self.getHeaders()['height']73        width = self.getHeaders()['width']74        frameSize = width * height * 875        self.__handle.seek(-frameSize, 2)76    def _write(self, matrix):77        content = bytes(matrix)78        self.__handle.write(content)79    def _closeForReading(self):80        self.__handle.close()81    def _closeForWriting(self):82        try:83            self.__handle.seek(264)84            buffer = struct.pack("i", self.getHeaders()["current_frame"])85            self.__handle.write(buffer)86        except Exception as exc:87            self.__handle.close()88            raise exc89        self.__handle.close()90class IncorrectFileFormat(IOError):91    def __init__(self):...p2p_initial_headers_sync.py
Source:p2p_initial_headers_sync.py  
...33    def run_test(self):34        self.log.info("Adding a peer to node0")35        peer1 = self.nodes[0].add_p2p_connection(P2PInterface())36        # Wait for peer1 to receive a getheaders37        peer1.wait_for_getheaders()38        # An empty reply will clear the outstanding getheaders request,39        # allowing additional getheaders requests to be sent to this peer in40        # the future.41        peer1.send_message(msg_headers())42        self.log.info("Connecting two more peers to node0")43        # Connect 2 more peers; they should not receive a getheaders yet44        peer2 = self.nodes[0].add_p2p_connection(P2PInterface())45        peer3 = self.nodes[0].add_p2p_connection(P2PInterface())46        all_peers = [peer1, peer2, peer3]47        self.log.info("Verify that peer2 and peer3 don't receive a getheaders after connecting")48        for p in all_peers:49            p.sync_with_ping()50        with p2p_lock:51            assert "getheaders" not in peer2.last_message52            assert "getheaders" not in peer3.last_message53        with p2p_lock:54            peer1.last_message.pop("getheaders", None)55        self.log.info("Have all peers announce a new block")56        self.announce_random_block(all_peers)57        self.log.info("Check that peer1 receives a getheaders in response")58        peer1.wait_for_getheaders()59        peer1.send_message(msg_headers()) # Send empty response, see above60        with p2p_lock:61            peer1.last_message.pop("getheaders", None)62        self.log.info("Check that exactly 1 of {peer2, peer3} received a getheaders in response")63        count = 064        peer_receiving_getheaders = None65        for p in [peer2, peer3]:66            with p2p_lock:67                if "getheaders" in p.last_message:68                    count += 169                    peer_receiving_getheaders = p70                    p.last_message.pop("getheaders", None)71                    p.send_message(msg_headers()) # Send empty response, see above72        assert_equal(count, 1)73        self.log.info("Announce another new block, from all peers")74        self.announce_random_block(all_peers)75        self.log.info("Check that peer1 receives a getheaders in response")76        peer1.wait_for_getheaders()77        self.log.info("Check that the remaining peer received a getheaders as well")78        expected_peer = peer279        if peer2 == peer_receiving_getheaders:80            expected_peer = peer381        expected_peer.wait_for_getheaders()82        self.log.info("Success!")83if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
