How to use getheaders method in tempest

Best Python code snippet using tempest_python

test_portal_api.py

Source:test_portal_api.py Github

copy

Full Screen

1import requests2import json3import unittest4import warnings5from test_partal_data.ProjectData import *6from test_partal_data.getHeaders import Get_headers7from test_partal_data.getUrl import GetUrl8from test_partal_data.method import Requests9import time10import pytest11headers = Get_headers()12url = GetUrl()13json = Publish_Data()14class Test_House():15 '''房屋建筑'''16 obj = Requests()17 warnings.simplefilter('ignore', ResourceWarning)18 def get(self, ID):19 with open('ID', "w") as f:20 f.write(str(ID))21 @property22 def read(self):23 '''获取id'''24 with open('ID', 'r') as f:25 return f.read()26 def test_publish_001(self):27 '''发布编辑项目信息'''28 r = self.obj.post(url=url.getUrl_001, json=json.building_001, headers=headers.getheaders)29 print(r.json())30 if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:31 assert r.json()['code'] == str(0)32 else:33 self.get(r.json()['data'])34 def test_publish_002(self):35 '''发布编辑受让信息'''36 r = self.obj.post(url=url.getUrl_002, json=json.building_002, headers=headers.getheaders)37 assert r.json()['code'] == str(0)38 assert r.json()['data'] == int(self.read)39 def test_publish_003(self):40 '''上传图片'''41 for i in range(10):42 r = self.obj.post(url=url.getUrl_003, files=json.building_file, headers=headers.getheaders_002,43 json=json.building_003)44 if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:45 assert r.json()['code'] == str(0)46 else:47 self.save_photo(r.json()['data']['fullPath'])48 def save_photo(self, photo):49 '''保存图片地址'''50 with open('photo', 'a') as f:51 f.write(str(photo) + '\n')52 def test_publish_004(self):53 '''发布上传附件信息'''54 r = self.obj.post(url=url.getUrl_004, json=json_clf.building_004, headers=headers.getheaders)55 if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:56 assert r.json()['code'] == str(0)57 def test_publish_005(self):58 '''确认承诺函'''59 r = self.obj.post(url=url.getUrl_005, json=json.building_005, headers=headers.getheaders)60 assert r.json()['code'] == str(0)61 def test_publish_006(self):62 '''清空photo'''63 with open('photo', 'r+', encoding='utf-8', ) as f:64 f.truncate(0)65json_Cml= Publish_Cml()66class Test_Cml():67 '''集体机动地'''68 obj = Requests()69 warnings.simplefilter('ignore', ResourceWarning)70 def get(self,ID):71 with open('ID', "w") as f:72 f.write(str(ID))73 @property74 def read(self):75 '''获取id'''76 with open('ID', 'r') as f:77 return f.read()78 def test_Cml_001(self):79 '''发布编辑项目信息'''80 r=self.obj.post(url=url.getUrl_001, json=json_Cml.building_001, headers=headers.getheaders)81 print(r.json())82 if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:83 assert r.json()['code'] == str(0)84 else:85 self.get(r.json()['data'])86 def test_Cml_002(self):87 '''发布编辑受让信息'''88 r = self.obj.post(url=url.getUrl_002, json=json_Cml.building_002, headers=headers.getheaders)89 assert r.json()['code'] == str(0)90 assert r.json()['data'] == int(self.read)91 def test_Cml_003(self):92 '''上传图片'''93 for i in range(10):94 r = self.obj.post(url=url.getUrl_003, files=json_Cml.building_file, headers=headers.getheaders_002,json=json.building_003)95 if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:96 assert r.json()['code'] == str(0)97 else:98 self.save_photo(r.json()['data']['fullPath'])99 #time.sleep(2)100 def save_photo(self,photo):101 '''保存图片地址'''102 with open('photo', 'a') as f:103 f.write(str(photo)+'\n')104 def test_Cml_004(self):105 '''发布上传附件信息'''106 r = self.obj.post(url=url.getUrl_004,data=json_clf.building_004,headers=headers.getheaders)107 #print(json_Cml.building_004)108 #print(type(json_Cml.building_004))109 print(r.json())110 if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:111 assert r.json()['code'] == str(0)112 def test_Cml_005(self):113 '''确认承诺函'''114 r=self.obj.post(url=url.getUrl_005,json=json_Cml.building_005,headers=headers.getheaders)115 assert r.json()['code'] == str(0)116 # def test_Cml_006(self):117 # '''清空photo'''118 # with open('photo', 'r+', encoding='utf-8', ) as f:119 # f.truncate(0)120json_clf = Publish_Clf()121class Test_Clf():122 '''农户承包地'''123 obj = Requests()124 warnings.simplefilter('ignore', ResourceWarning)125 def get(self,ID):126 with open('ID', "w") as f:127 f.write(str(ID))128 @property129 def read(self):130 '''获取id'''131 with open('ID', 'r') as f:132 return f.read()133 def test_clf_001(self):134 '''发布编辑项目信息'''135 r=self.obj.post(url=url.getUrl_001, json=json_clf.clf_001, headers=headers.getheaders)136 if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:137 assert r.json()['code'] == str(0)138 else:139 self.get(r.json()['data'])140 def test_clf_002(self):141 '''发布编辑受让信息'''142 r = self.obj.post(url=url.getUrl_002, json=json_clf.clf_002, headers=headers.getheaders)143 assert r.json()['code'] == str(0)144 assert r.json()['data'] == int(self.read)145 def test_clf_003(self):146 '''上传图片'''147 for i in range(10):148 r = self.obj.post(url=url.getUrl_003, files=json_clf.building_file, headers=headers.getheaders_002,json=json_clf.clf_003)149 if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:150 assert r.json()['code'] == str(0)151 else:152 self.save_photo(r.json()['data']['fullPath'])153 def save_photo(self,photo):154 '''保存图片地址'''155 with open('photo', 'a') as f:156 f.write(str(photo)+'\n')157 def test_clf_004(self):158 '''发布上传附件信息'''159 r = self.obj.post(url=url.getUrl_004,json=json_clf.clf_004,headers=headers.getheaders)160 print(r.json())161 if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:162 assert r.json()['code'] == str(0)163 def test_clf_005(self):164 '''确认承诺函'''165 r=self.obj.post(url=url.getUrl_005,json=json_clf.clf_005,headers=headers.getheaders)166 assert r.json()['code'] == str(0)167 def test_clf_006(self):168 '''清空photo'''169 with open('photo', 'r+', encoding='utf-8', ) as f:170 f.truncate(0)171json_wd = Publish_Woodland()172class Test_Wd():173 '''林地'''174 obj = Requests()175 warnings.simplefilter('ignore', ResourceWarning)176 def get(self,ID):177 with open('ID', "w") as f:178 f.write(str(ID))179 @property180 def read(self):181 '''获取id'''182 with open('ID', 'r') as f:183 return f.read()184 def test_wd_001(self):185 '''发布编辑项目信息'''186 r=self.obj.post(url=url.getUrl_001, json=json_wd.building_001, headers=headers.getheaders)187 print(r.json())188 if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:189 assert r.json()['code'] == str(0)190 else:191 self.get(r.json()['data'])192 def test_wd_002(self):193 '''发布编辑受让信息'''194 r = self.obj.post(url=url.getUrl_002, json=json_wd.building_002, headers=headers.getheaders)195 assert r.json()['code'] == str(0)196 assert r.json()['data'] == int(self.read)197 def test_wd_003(self):198 '''上传图片'''199 for i in range(10):200 r = self.obj.post(url=url.getUrl_003, files=json_wd.building_file, headers=headers.getheaders_002,json=json.building_003)201 print(r.json())202 if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:203 assert r.json()['code'] == str(0)204 else:205 self.save_photo(r.json()['data']['fullPath'])206 def save_photo(self,photo):207 '''保存图片地址'''208 with open('photo', 'a') as f:209 f.write(str(photo)+'\n')210 def test_wd_004(self):211 '''发布上传附件信息'''212 r = self.obj.post(url=url.getUrl_004,json=json_clf.building_004,headers=headers.getheaders)213 print(r.json())214 if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:215 assert r.json()['code'] == str(0)216 def test_wd_005(self):217 '''确认承诺函'''218 r=self.obj.post(url=url.getUrl_005,json=json_wd.building_005,headers=headers.getheaders)219 assert r.json()['code'] == str(0)220 def test_publish_006(self):221 '''清空photo'''222 with open('photo', 'r+', encoding='utf-8', ) as f:223 f.truncate(0)224json_wr= Publish_water()225class Test_Wr():226 '''水面'''227 obj = Requests()228 warnings.simplefilter('ignore', ResourceWarning)229 def get(self,ID):230 with open('ID', "w") as f:231 f.write(str(ID))232 @property233 def read(self):234 '''获取id'''235 with open('ID', 'r') as f:236 return f.read()237 def test_wr_001(self):238 '''发布编辑项目信息'''239 r=self.obj.post(url=url.getUrl_001, json=json_wr.building_001, headers=headers.getheaders)240 print(r.json())241 if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:242 assert r.json()['code'] == str(0)243 else:244 self.get(r.json()['data'])245 def test_wr_002(self):246 '''发布编辑受让信息'''247 r = self.obj.post(url=url.getUrl_002, json=json_wr.building_002, headers=headers.getheaders)248 assert r.json()['code'] == str(0)249 assert r.json()['data'] == int(self.read)250 def test_wr_003(self):251 '''上传图片'''252 for i in range(10):253 r = self.obj.post(url=url.getUrl_003, files=json_wr.building_file, headers=headers.getheaders_002,json=json.building_003)254 if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:255 assert r.json()['code'] == str(0)256 else:257 self.save_photo(r.json()['data']['fullPath'])258 def save_photo(self,photo):259 '''保存图片地址'''260 with open('photo', 'a') as f:261 f.write(str(photo)+'\n')262 def test_wr_004(self):263 '''发布上传附件信息'''264 r = self.obj.post(url=url.getUrl_004,json=json_clf.building_004,headers=headers.getheaders)265 if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:266 assert r.json()['code'] == str(0)267 def test_wr_005(self):268 '''确认承诺函'''269 r=self.obj.post(url=url.getUrl_005,json=json_wr.building_005,headers=headers.getheaders)270 assert r.json()['code'] == str(0)271 def test_wr_006(self):272 '''清空photo'''273 with open('photo', 'r+', encoding='utf-8', ) as f:274 f.truncate(0)275json_Em= Publish_Equipment()276class Test_Em():277 '''设备'''278 obj = Requests()279 warnings.simplefilter('ignore', ResourceWarning)280 def get(self,ID):281 with open('ID', "w") as f:282 f.write(str(ID))283 @property284 def read(self):285 '''获取id'''286 with open('ID', 'r') as f:287 return f.read()288 def test_Em_001(self):289 '''发布编辑项目信息'''290 r=self.obj.post(url=url.getUrl_001, json=json_Em.building_001, headers=headers.getheaders)291 print(r.json())292 if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:293 assert r.json()['code'] == str(0)294 else:295 self.get(r.json()['data'])296 def test_Em_002(self):297 '''发布编辑受让信息'''298 r = self.obj.post(url=url.getUrl_002, json=json_Em.building_002, headers=headers.getheaders)299 assert r.json()['code']==str(0)300 assert r.json()['data'] == int(self.read)301 #assert self.excel.get_Expect(row=row) in json.dumps(r.json(), ensure_ascii=False)302 def test_Em_003(self):303 '''上传图片'''304 for i in range(10):305 r = self.obj.post(url=url.getUrl_003, files=json_Em.building_file, headers=headers.getheaders_002,json=json.building_003)306 if r.json()['code']==None or r.json()['data']==None or r.json()['data']==999:307 assert r.json()['code']==str(0)308 else:309 self.save_photo(r.json()['data']['fullPath'])310 def save_photo(self,photo):311 '''保存图片地址'''312 with open('photo', 'a') as f:313 f.write(str(photo)+'\n')314 def test_Em_004(self):315 '''发布上传附件信息'''316 r = self.obj.post(url=url.getUrl_004,json=json_clf.building_004,headers=headers.getheaders)317 if r.json()['code'] == None or r.json()['data'] == None or r.json()['data'] == 999:318 assert r.json()['code']==str(0)319 def test_Em_005(self):320 '''确认承诺函'''321 r=self.obj.post(url=url.getUrl_005,json=json_Em.building_005,headers=headers.getheaders)322 assert r.json()['code']==str(0)323 def test_Em_006(self):324 '''清空photo'''325 with open('photo', 'r+', encoding='utf-8', ) as f:326 f.truncate(0)327if __name__ == '__main__':328 while True:329 try:330 i = int(input('请输入需要创建的项目类型:\n 1.房屋建筑物 2.集体机动地\n 3.农户承包地 4.林地林木\n 5.水面 6.设备设施\n 7.退出\n'))331 if i == 1:332 pytest.main(['-s', '-v', 'test_portal_api.py::Test_House'])333 break334 elif i == 2:335 pytest.main(['-s', '-v', 'test_portal_api.py::Test_Cml'])336 break337 elif i == 3:338 pytest.main(['-s', '-v', 'test_portal_api.py::Test_Clf'])339 break340 elif i == 4:341 pytest.main(['-s', '-v', 'test_portal_api.py::Test_Wd'])342 break343 elif i == 5:344 pytest.main(['-s', '-v', 'test_portal_api.py::Test_Wr'])345 break346 elif i == 6:347 pytest.main(['-s', '-v', 'test_portal_api.py::Test_Em'])348 break349 elif i == 7:350 break351 else:352 print('输入正确的数字')353 except Exception as f:...

Full Screen

Full Screen

api.py

Source:api.py Github

copy

Full Screen

1import requests2def getHeaders():3 headers = {4 'Content-Type': 'application/json',5 'Accept': 'application/json'6 }7 return headers8def getToken(payload):9 response = requests.post('https://app.skuvault.com/api/gettokens', headers=getHeaders(), json=payload)10 return response.json()11def addItem(payload):12 response = requests.post('https://app.skuvault.com/api/inventory/addItem', headers=getHeaders(), json=payload)13 return response.json()14def addItemBulk(payload):15 response = requests.post('https://app.skuvault.com/api/inventory/addItemBulk', headers=getHeaders(), json=payload)16 return response.json()17def addShipments(payload):18 response = requests.post('https://app.skuvault.com/api/sales/addShipments', headers=getHeaders(), json=payload)19 return response.json()20def createBrands(payload):21 response = requests.post('https://app.skuvault.com/api/products/createBrands', headers=getHeaders(), json=payload)22 return response.json()23def createHolds(payload):24 response = requests.post('https://app.skuvault.com/api/sales/createHolds', headers=getHeaders(), json=payload)25 return response.json()26def createKit(payload):27 response = requests.post('https://app.skuvault.com/api/products/createKit', headers=getHeaders(), json=payload)28 return response.json()29def createPO(payload):30 response = requests.post('https://app.skuvault.com/api/purchaseorders/createPO', headers=getHeaders(), json=payload)31 return response.json()32def createProduct(payload):33 response = requests.post('https://app.skuvault.com/api/products/createProduct', headers=getHeaders(), json=payload)34 return response.json()35def createProducts(payload):36 response = requests.post('https://app.skuvault.com/api/products/createProducts', headers=getHeaders(), json=payload)37 return response.json()38def createSuppliers(payload):39 response = requests.post('https://app.skuvault.com/api/products/createSuppliers', headers=getHeaders(), json=payload)40 return response.json()41def getAvailableQuantities(payload):42 response = requests.post('https://app.skuvault.com/api/inventory/getAvailableQuantities', headers=getHeaders(), json=payload)43 return response.json()44def getBrands(payload):45 response = requests.post('https://app.skuvault.com/api/products/getBrands', headers=getHeaders(), json=payload)46 return response.json()47def getClassifications(payload):48 response = requests.post('https://app.skuvault.com/api/products/getClassifications', headers=getHeaders(), json=payload)49 return response.json()50def getExternalWarehouseQuantities(payload):51 response = requests.post('https://app.skuvault.com/api/inventory/getExternalWarehouseQuantities', headers=getHeaders(), json=payload)52 return response.json()53def getExternalWarehouses(payload):54 response = requests.post('https://app.skuvault.com/api/inventory/getExternalWarehouses', headers=getHeaders(), json=payload)55 return response.json()56def getHandlingTime(payload):57 response = requests.post('https://app.skuvault.com/api/products/getHandlingTime', headers=getHeaders(), json=payload)58 return response.json()59def getIncomingItems(payload):60 response = requests.post('https://app.skuvault.com/api/purchaseorders/getIncomingItems', headers=getHeaders(), json=payload)61 return response.json()62def getIntegrations(payload):63 response = requests.post('https://app.skuvault.com/api/integration/getIntegrations', headers=getHeaders(), json=payload)64 return response.json()65def getInventoryByLocation(payload):66 response = requests.post('https://app.skuvault.com/api/inventory/getInventoryByLocation', headers=getHeaders(), json=payload)67 return response.json()68def getItemQuantities(payload):69 response = requests.post('https://app.skuvault.com/api/inventory/getItemQuantities', headers=getHeaders(), json=payload)70 return response.json()71def getKitQuantities(payload):72 response = requests.post('https://app.skuvault.com/api/inventory/getKitQuantities', headers=getHeaders(), json=payload)73 return response.json()74def getKits(payload):75 response = requests.post('https://app.skuvault.com/api/products/getKits', headers=getHeaders(), json=payload)76 return response.json()77def getLocations(payload):78 response = requests.post('https://app.skuvault.com/api/inventory/getLocations', headers=getHeaders(), json=payload)79 return response.json()80def getOnlineSaleStatus(payload):81 response = requests.post('https://app.skuvault.com/api/sales/getOnlineSaleStatus', headers=getHeaders(), json=payload)82 return response.json()83def getPOs(payload):84 response = requests.post('https://app.skuvault.com/api/purchaseorders/getPOs', headers=getHeaders(), json=payload)85 return response.json()86def getProduct(payload):87 response = requests.post('https://app.skuvault.com/api/products/getProduct', headers=getHeaders(), json=payload)88 return response.json()89def getProducts(payload):90 response = requests.post('https://app.skuvault.com/api/products/getProducts', headers=getHeaders(), json=payload)91 return response.json()92def getSerialNumbers(payload):93 response = requests.post('https://app.skuvault.com/api/inventory/getSerialNumbers', headers=getHeaders(), json=payload)94 return response.json()95def getReceivesHistory(payload):96 response = requests.post('https://app.skuvault.com/api/purchaseorders/getReceivesHistory', headers=getHeaders(), json=payload)97 return response.json()98def getSaleItemCost(payload):99 response = requests.post('https://app.skuvault.com/api/sales/getSaleItemCost', headers=getHeaders(), json=payload)100 return response.json()101def getSales(payload):102 response = requests.post('https://app.skuvault.com/api/sales/getSales', headers=getHeaders(), json=payload)103 return response.json()104def getSalesByDate(payload):105 response = requests.post('https://app.skuvault.com/api/sales/getSalesByDate', headers=getHeaders(), json=payload)106 return response.json()107def getShipments(payload):108 response = requests.post('https://app.skuvault.com/api/sales/getShipments', headers=getHeaders(), json=payload)109 return response.json()110def getSuppliers(payload):111 response = requests.post('https://app.skuvault.com/api/products/getSuppliers', headers=getHeaders(), json=payload)112 return response.json()113def getTransactions(payload):114 response = requests.post('https://app.skuvault.com/api/inventory/getTransactions', headers=getHeaders(), json=payload)115 return response.json()116def getWarehouseItemQuantities(payload):117 response = requests.post('https://app.skuvault.com/api/inventory/getWarehouseItemQuantities', headers=getHeaders(), json=payload)118 return response.json()119def getWarehouseItemQuantity(payload):120 response = requests.post('https://app.skuvault.com/api/inventory/getWarehouseItemQuantity', headers=getHeaders(), json=payload)121 return response.json()122def getWarehouses(payload):123 response = requests.post('https://app.skuvault.com/api/inventory/getWarehouses', headers=getHeaders(), json=payload)124 return response.json()125def pickItem(payload):126 response = requests.post('https://app.skuvault.com/api/inventory/pickItem', headers=getHeaders(), json=payload)127 return response.json()128def pickItemBulk(payload):129 response = requests.post('https://app.skuvault.com/api/inventory/pickItemBulk', headers=getHeaders(), json=payload)130 return response.json()131def receivePOItems(payload):132 response = requests.post('https://app.skuvault.com/api/purchaseorders/receivePOItems', headers=getHeaders(), json=payload)133 return response.json()134def releaseHeldQuantities(payload):135 response = requests.post('https://app.skuvault.com/api/sales/releaseHeldQuantities', headers=getHeaders(), json=payload)136 return response.json()137def removeItem(payload):138 response = requests.post('https://app.skuvault.com/api/inventory/removeItem', headers=getHeaders(), json=payload)139 return response.json()140def removeItemBulk(payload):141 response = requests.post('https://app.skuvault.com/api/inventory/removeItemBulk', headers=getHeaders(), json=payload)142 return response.json()143def setItemQuantities(payload):144 response = requests.post('https://app.skuvault.com/api/inventory/setItemQuantities', headers=getHeaders(), json=payload)145 return response.json()146def setItemQuantity(payload):147 response = requests.post('https://app.skuvault.com/api/inventory/setItemQuantity', headers=getHeaders(), json=payload)148 return response.json()149def setShipmentFile(payload):150 response = requests.post('https://app.skuvault.com/api/sales/setShipmentFile', headers=getHeaders(), json=payload)151 return response.json()152def syncOnlineSale(payload):153 response = requests.post('https://app.skuvault.com/api/sales/syncOnlineSale', headers=getHeaders(), json=payload)154 return response.json()155def syncOnlineSales(payload):156 response = requests.post('https://app.skuvault.com/api/sales/syncOnlineSales', headers=getHeaders(), json=payload)157 return response.json()158def syncShippedSaleAndRemoveItems(payload):159 response = requests.post('https://app.skuvault.com/api/sales/syncShippedSaleAndRemoveItems', headers=getHeaders(), json=payload)160 return response.json()161def syncShippedSaleAndRemoveItemsBulk(payload):162 response = requests.post('https://app.skuvault.com/api/sales/syncShippedSaleAndRemoveItems/bulk', headers=getHeaders(), json=payload)163 return response.json()164def updateAltSKUsCodes(payload):165 response = requests.post('https://app.skuvault.com/api/products/updateAltSKUsCodes', headers=getHeaders(), json=payload)166 return response.json()167def updateExternalWarehouseQuantities(payload):168 response = requests.post('https://app.skuvault.com/api/inventory/updateExternalWarehouseQuantities', headers=getHeaders(), json=payload)169 return response.json()170def updateHandlingTime(payload):171 response = requests.post('https://app.skuvault.com/api/products/updateHandlingTime', headers=getHeaders(), json=payload)172 return response.json()173def updateOnlineSaleStatus(payload):174 response = requests.post('https://app.skuvault.com/api/sales/updateOnlineSaleStatus', headers=getHeaders(), json=payload)175 return response.json()176def updatePOs(payload):177 response = requests.post('https://app.skuvault.com/api/purchaseorders/updatePOs', headers=getHeaders(), json=payload)178 return response.json()179def updateProduct(payload):180 response = requests.post('https://app.skuvault.com/api/products/updateProduct', headers=getHeaders(), json=payload)181 return response.json()182def updateProducts(payload):183 response = requests.post('https://app.skuvault.com/api/products/updateProducts', headers=getHeaders(), json=payload)184 return response.json()185def updateShipments(payload):186 response = requests.post('https://app.skuvault.com/api/sales/updateShipments', headers=getHeaders(), json=payload)...

Full Screen

Full Screen

BinStream.py

Source:BinStream.py Github

copy

Full Screen

1#-*- coding: utf-82import numpy as np3import struct4from vis_brain.streams.Stream import Stream5_STREAM_CHUNK = "#!vis-brain.data.stream"6_STREAM_CHUNK_SIZE = 2567class BinStream(Stream):8 '''9 Represents so called .bin stream. The .bin stream deals with native vis-brain .bin's streams. All routines10 gave already been declared in the Stream class. This class declares their implementations11 '''12 __handle = None13 def _openForReading(self):14 global _STREAM_CHUNK, _STREAM_CHUNK_SIZE15 self.__handle = open(self._filename, "rb")16 try:17 raw_chunk = self.__handle.read(_STREAM_CHUNK_SIZE)18 chunk = raw_chunk.decode("utf-8").rstrip("\0")19 if chunk != _STREAM_CHUNK:20 raise IncorrectFileFormat()21 raw_header = self.__handle.read(12)22 header = struct.unpack("iii", raw_header)23 self.getHeaders()['height'] = header[0]24 self.getHeaders()['width'] = header[1]25 self.getHeaders()['nframes'] = header[2]26 raw_header = self.__handle.read(24)27 header = struct.unpack("ddd", raw_header)28 self.getHeaders()['height_um'] = header[0]29 self.getHeaders()['width_um'] = header[1]30 self.getHeaders()['sample_rate'] = header[2]31 except Exception as exc:32 self.__handle.close()33 raise exc34 def _openForWriting(self):35 global _STREAM_CHUNK, _STREAM_CHUNK_SIZE36 self.__handle = open(self._filename, "wb")37 try:38 raw_chunk = _STREAM_CHUNK.encode("utf-8")39 raw_chunk += b"\x00" * (_STREAM_CHUNK_SIZE - len(raw_chunk))40 self.__handle.write(raw_chunk)41 integer_header = struct.pack("iii",42 self.getHeaders()['height'],43 self.getHeaders()['width'],44 self.getHeaders()['nframes'])45 float_header = struct.pack("ddd",46 self.getHeaders()["height_um"],47 self.getHeaders()["width_um"],48 self.getHeaders()["sample_rate"])49 self.__handle.write(integer_header)50 self.__handle.write(float_header)51 except Exception as exc:52 self.__handle.close()53 raise exc54 def _read(self):55 height = self.getHeaders()['height']56 width = self.getHeaders()['width']57 size = height * width58 frame_size = size * 859 raw_data = self.__handle.read(frame_size)60 data = struct.unpack("d"*size, raw_data)61 raw_matrix = np.array(data)62 matrix = np.reshape(raw_matrix, (height, width))63 return matrix64 def _move(self, n):65 height = self.getHeaders()['height']66 width = self.getHeaders()['width']67 frameSize = width * height * 868 self.__handle.seek(frameSize * n, 1)69 def _first(self):70 self.__handle.seek(292, 0)71 def _last(self):72 height = self.getHeaders()['height']73 width = self.getHeaders()['width']74 frameSize = width * height * 875 self.__handle.seek(-frameSize, 2)76 def _write(self, matrix):77 content = bytes(matrix)78 self.__handle.write(content)79 def _closeForReading(self):80 self.__handle.close()81 def _closeForWriting(self):82 try:83 self.__handle.seek(264)84 buffer = struct.pack("i", self.getHeaders()["current_frame"])85 self.__handle.write(buffer)86 except Exception as exc:87 self.__handle.close()88 raise exc89 self.__handle.close()90class IncorrectFileFormat(IOError):91 def __init__(self):...

Full Screen

Full Screen

p2p_initial_headers_sync.py

Source:p2p_initial_headers_sync.py Github

copy

Full Screen

...33 def run_test(self):34 self.log.info("Adding a peer to node0")35 peer1 = self.nodes[0].add_p2p_connection(P2PInterface())36 # Wait for peer1 to receive a getheaders37 peer1.wait_for_getheaders()38 # An empty reply will clear the outstanding getheaders request,39 # allowing additional getheaders requests to be sent to this peer in40 # the future.41 peer1.send_message(msg_headers())42 self.log.info("Connecting two more peers to node0")43 # Connect 2 more peers; they should not receive a getheaders yet44 peer2 = self.nodes[0].add_p2p_connection(P2PInterface())45 peer3 = self.nodes[0].add_p2p_connection(P2PInterface())46 all_peers = [peer1, peer2, peer3]47 self.log.info("Verify that peer2 and peer3 don't receive a getheaders after connecting")48 for p in all_peers:49 p.sync_with_ping()50 with p2p_lock:51 assert "getheaders" not in peer2.last_message52 assert "getheaders" not in peer3.last_message53 with p2p_lock:54 peer1.last_message.pop("getheaders", None)55 self.log.info("Have all peers announce a new block")56 self.announce_random_block(all_peers)57 self.log.info("Check that peer1 receives a getheaders in response")58 peer1.wait_for_getheaders()59 peer1.send_message(msg_headers()) # Send empty response, see above60 with p2p_lock:61 peer1.last_message.pop("getheaders", None)62 self.log.info("Check that exactly 1 of {peer2, peer3} received a getheaders in response")63 count = 064 peer_receiving_getheaders = None65 for p in [peer2, peer3]:66 with p2p_lock:67 if "getheaders" in p.last_message:68 count += 169 peer_receiving_getheaders = p70 p.last_message.pop("getheaders", None)71 p.send_message(msg_headers()) # Send empty response, see above72 assert_equal(count, 1)73 self.log.info("Announce another new block, from all peers")74 self.announce_random_block(all_peers)75 self.log.info("Check that peer1 receives a getheaders in response")76 peer1.wait_for_getheaders()77 self.log.info("Check that the remaining peer received a getheaders as well")78 expected_peer = peer279 if peer2 == peer_receiving_getheaders:80 expected_peer = peer381 expected_peer.wait_for_getheaders()82 self.log.info("Success!")83if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful