Best Python code snippet using locust
views.py
Source:views.py  
1from django.http import HttpResponse, JsonResponse2from django.shortcuts import render, get_object_or_4043from django.views.generic import View4from django.core.exceptions import ValidationError5from django.contrib.auth import hashers6from django.views.decorators.csrf import csrf_exempt7from django.utils.decorators import method_decorator8from datetime import datetime, timezone9import json10from fafs_api.models import User, Address, School, Category, Product, Transaction, Authenticator11def get_key(dictionary, key):12	try:13		return dictionary[key]14	except KeyError:15		return None16def retrieve_all_fields(dictionary, field_list):17	return_dict = {}18	for field in field_list:19		value = get_key(dictionary, field)20		return_dict[field] = value21	return return_dict22def get_object_or_none(model, **kwargs):23	try:24		return model.objects.get(**kwargs)25	except model.DoesNotExist:26		return None27def json_encode_dict_and_status(dictionary, status):28	response_dict = {}29	response_dict["status"] = status30	response_dict["response"] = dictionary31	return response_dict32class AuthView(View):33	model = Authenticator34	required_fields = ['user_id']35	@method_decorator(csrf_exempt)36	def dispatch(self, request, *args, **kwargs):37		return super(AuthView, self).dispatch(request, *args, **kwargs)38	def get(self, request, token=None):39		status = False40		if token is not None:41			queryset = get_object_or_none(42				self.model,43				token=token44			)45			if queryset is not None:46				status = True47				json_data = {48					"token": queryset.token,49					"user_id":queryset.user.pk,50					"date_created":queryset.date_created51				}52			else:53				status = False54				json_data = {"message": "No authenticator with token found"}55		else:56			status = True57			queryset = self.model.objects.all()58			json_data = list(queryset.values('token', 'user_id', 'date_created'))59		return JsonResponse(json_encode_dict_and_status(json_data, status))60	def post(self, request):61		json_data = json.loads(request.body.decode('utf-8'))62		field_dict = retrieve_all_fields(63						json_data,64						self.required_fields65		)66		try:67			user = User.objects.get(pk=field_dict['user_id'])68			auth = Authenticator()69			auth.user = user70			auth.save()71			response_data = {72				"token": auth.token,73				"user_id": auth.user.pk,74				"date_created": auth.date_created75			}76			status = True77		except User.DoesNotExist:78			status = False79			response_data = {"message": "Invalid user id"}80		return JsonResponse(json_encode_dict_and_status(response_data, status))81	def delete(self, request, token=None):82		status = False83		obj = get_object_or_none(self.model, token=token)84		if obj is not None:85			obj.delete()86			status = True87		return JsonResponse(json_encode_dict_and_status({},status))88@method_decorator(csrf_exempt)89def auth_check(request):90	required_fields = ['authenticator']91	if request.method == "POST":92		json_data = json.loads(request.body.decode('utf-8'))93		field_dict = retrieve_all_fields(94			json_data,95			required_fields96		)97		status = False98		try:99			auth = Authenticator.objects.get(token=field_dict['authenticator'])100			user_id = auth.user.pk101			status = True102			response_data = {"user_id": user_id}103		except Authenticator.DoesNotExist:104			response_data = {"message": "Invalid authenticator"}105		return JsonResponse(json_encode_dict_and_status(response_data, status))106@method_decorator(csrf_exempt)107def users_check_pass(request):108	required_fields = ['email', 'password']109	if request.method == "POST":110		json_data = json.loads(request.body.decode('utf-8'))111		field_dict = retrieve_all_fields(112			json_data,113			required_fields114		)115		status = False116		response_data = {"message": "Incorrect email/password"}117		try:118			login_user = User.objects.get(email=field_dict['email'])119			# Check password120			hashed_password = login_user.password121			if hashers.check_password(field_dict['password'], hashed_password):122				status = True123				response_data = {"user_id": login_user.pk}124		except User.DoesNotExist:125			pass126		return JsonResponse(json_encode_dict_and_status(response_data, status))127class UserView(View):128	required_fields = ['email', 'school_id', 'password']129	update_fields = ['user_pk','email','password']130	model = User131	@method_decorator(csrf_exempt)132	def dispatch(self, request, *args, **kwargs):133		return super(UserView, self).dispatch(request, *args, **kwargs)134	def get(self, request, pk=None):135		status = False136		if pk is not None:137			queryset = get_object_or_none(138				self.model,139				pk=pk140				)141			if queryset is not None:142				status = True143				json_data = {144					"pk":queryset.pk,145					"email":queryset.email,146					"school_id":queryset.school_id.pk,147					"password":queryset.password148					}149			else:150				status = False151				json_data = {}152		else:153			status = True154			queryset = self.model.objects.all()155			json_data = list(queryset.values('pk','email','school_id', 'password'))156		return JsonResponse(json_encode_dict_and_status(json_data, status))157	def post(self, request):158		status = True159		json_data = json.loads(request.body.decode('utf-8'))160		field_dict = retrieve_all_fields(161			json_data,162			self.required_fields163			)164		try:165			user = self.model.objects.create_user(166				email=field_dict['email'],167				password=field_dict['password'],168				school=field_dict['school_id']169				)170			json_data = {171				"pk":user.pk,172				"email":user.email,173				"school_id":user.school_id.pk174				}175		except ValidationError as e:176			json_data = e.message_dict177			status = False178		return JsonResponse(json_encode_dict_and_status(json_data, status))179	def patch(self, request):180		status = True181		json_data = json.loads(request.body.decode('utf-8'))182		field_dict = retrieve_all_fields(183			json_data,184			self.update_fields185			)186		try:187			user = self.model.objects.update_user(188				user_pk = get_key(field_dict,'user_pk'),189				email = get_key(field_dict,'email'),190				password = get_key(field_dict,'password')191				)192			json_data = {193				"pk":user.pk,194				"email":user.email,195				"school_id":user.school_id.pk196				}197		except ValidationError as e:198			json_data = e.message_dict199			status = False200		return JsonResponse(json_encode_dict_and_status(json_data, status))201	def delete(self, request, pk=None):202		status = False203		obj = get_object_or_none(self.model, pk=pk)204		if obj is not None:205			obj.delete()206			status = True207		return JsonResponse(json_encode_dict_and_status({},status))208class AddressView(View):209	required_fields = ['street_number', 'street_name', 'city', 'state', 'zipcode', 'description']210	model = Address211	@method_decorator(csrf_exempt)212	def dispatch(self, request, *args, **kwargs):213		return super(AddressView, self).dispatch(request, *args, **kwargs)214	def get(self, request, pk=None):215		status = True216		if pk is not None:217			queryset = get_object_or_none(218				self.model,219				pk=pk220			)221			if queryset is not None:222				json_data = {223					"pk": queryset.pk,224					"street_number": queryset.street_number,225					"street_name": queryset.street_name,226					"city": queryset.city,227					"state": queryset.state,228					"zipcode": queryset.zipcode,229					"description": queryset.description,230					"address_2": queryset.address_2231				}232			else:233				status = False234				json_data = {}235		else:236			queryset = self.model.objects.all()237			json_data = list(queryset.values('pk', 'street_number', 'street_name',238				'city', 'state', 'zipcode', 'address_2'))239		return JsonResponse(json_encode_dict_and_status(json_data, status))240	def post(self, request):241		status = True242		json_data = json.loads(request.body.decode('utf-8'))243		field_dict = retrieve_all_fields(244			json_data,245			self.required_fields246		)247		try:248			address = self.model(249				street_number = field_dict['street_number'],250				street_name = field_dict['street_name'],251				city = field_dict['city'],252				state = field_dict['state'],253				zipcode = field_dict['zipcode'],254				address_2 = field_dict['address_2']255			)256			address.clean()257			address.save()258			json_data = {259				"street_number": address.street_number,260				"street_name": address.street_name,261				"city": address.city,262				"state": address.state,263				"zipcode": address.zipcode,264				"address_2": address.address_2265			}266		except ValidationError as e:267			status = False268			json_data = e.message_dict269		return JsonResponse(json_encode_dict_and_status(json_data, status))270	def delete(self, request, pk=None):271		status = False272		obj = get_object_or_none(self.model, pk=pk)273		if obj is not None:274			obj.delete()275			status = True276		return JsonResponse(json_encode_dict_and_status({},status))277class SchoolView(View):278	required_fields = ['name','city','state']279	model = School280	@method_decorator(csrf_exempt)281	def dispatch(self, request, *args, **kwargs):282		return super(SchoolView, self).dispatch(request, *args, **kwargs)283	def get(self, request, pk=None):284		status = True285		if pk is not None:286			queryset = get_object_or_none(287				self.model,288				pk=pk)289			if queryset is not None:290				json_data = {291					"pk":queryset.pk,292					"name":queryset.name,293					"city":queryset.city,294					"state":queryset.state295					}296			else:297				status = False298				json_data = {}299		else:300			queryset = self.model.objects.all()301			json_data = list(queryset.values('pk','name','city','state'))302		return JsonResponse(json_encode_dict_and_status(json_data, status))303	def post(self, request):304		status = True305		json_data = json.loads(request.body.decode('utf-8'))306		field_dict = retrieve_all_fields(307			json_data,308			self.required_fields309			)310		try:311			school = self.model(312				name=field_dict['name'],313				city=field_dict['city'],314				state=field_dict['state']315				)316			school.clean()317			school.save()318			json_data = {319				"name":school.name,320				"city":school.city,321				"state":school.state,322				"pk":school.pk323				}324		except ValidationError as e:325			status = False326			json_data = e.message_dict327		return JsonResponse(json_encode_dict_and_status(json_data, status))328	def delete(self, request, pk=None):329		status = False330		obj = get_object_or_none(self.model, pk=pk)331		if obj is not None:332			obj.delete()333			status = True334		return JsonResponse(json_encode_dict_and_status({},status))335class CategoryView(View):336	required_fields = ['name', 'description']337	model = Category338	@method_decorator(csrf_exempt)339	def dispatch(self, request, *args, **kwargs):340		return super(CategoryView, self).dispatch(request, *args, **kwargs)341	def get(self, request, pk=None):342		status = True343		if pk is not None:344			queryset = get_object_or_none(345				self.model,346				pk=pk347			)348			if queryset is not None:349				json_data = {350					"pk": queryset.pk,351					"name": queryset.name,352					"description": queryset.description353				}354			else:355				status = False356				json_data = {}357		else:358			queryset = self.model.objects.all()359			json_data = list(queryset.values('pk', 'name', 'description'))360		return JsonResponse(json_encode_dict_and_status(json_data, status))361	def post(self, request):362		status = True363		json_data = json.loads(request.body.decode('utf-8'))364		field_dict = retrieve_all_fields(365			json_data,366			self.required_fields367		)368		try:369			category = self.model(370				name=field_dict['name'],371				description=field_dict['description']372			)373			category.clean()374			category.save()375			json_data = {376				"name": category.name,377				"description": category.description378			}379		except ValidationError as e:380			status = False381			json_data = e.message_dict382		return JsonResponse(json_encode_dict_and_status(json_data, status))383	def delete(self, request, pk=None):384		status = False385		obj = get_object_or_none(self.model, pk=pk)386		if obj is not None:387			obj.delete()388			status = True389		return JsonResponse(json_encode_dict_and_status({},status))390	def delete(self, request, pk=None):391		status = False392		obj = get_object_or_none(self.model, pk=pk)393		if obj is not None:394			obj.delete()395			status = True396		return JsonResponse(json_encode_dict_and_status({},status))397class ProductView(View):398	required_fields = ['name', 'description', 'category_id', 'price', 'owner_id', 'pick_up']399	model = Product400	@method_decorator(csrf_exempt)401	def dispatch(self, request, *args, **kwargs):402		return super(ProductView, self).dispatch(request, *args, **kwargs)403	def get(self, request, pk=None):404		status = True405		if pk is not None:406			queryset = get_object_or_none(407				self.model,408				pk=pk409			)410			if queryset is not None:411				json_data = {412					"pk": queryset.pk,413					"name": queryset.name,414					"description": queryset.description,415					"category_id": queryset.category_id.pk,416					"price": queryset.price,417					"owner_id": queryset.owner_id.pk,418					"pick_up": queryset.pick_up,419					"time_posted": queryset.time_posted,420					"time_updated": queryset.time_updated421				}422			else:423				status = False424				json_data = {}425		else:426			queryset = self.model.objects.all()427			json_data = list(queryset.values('pk','name','description',428				'category_id', 'price', 'owner_id', 'pick_up', 'time_posted', 'time_updated'))429		return JsonResponse(json_encode_dict_and_status(json_data, status))430	def post(self, request):431		status = True432		json_data = json.loads(request.body.decode('utf-8'))433		field_dict = retrieve_all_fields(434			json_data,435			self.required_fields436		)437		try:438			category = None439			owner = None440			try:441				category = Category.objects.get(pk=field_dict['category_id'])442				owner = User.objects.get(pk=field_dict['owner_id'])443			except (Category.DoesNotExist, User.DoesNotExist):444				raise ValidationError({445					"Error":"Invalid category or owner id"446					})447			product = self.model(448				name=field_dict['name'],449				description=field_dict['description'],450				category_id=category,451				price=field_dict['price'],452				owner_id=owner,453				pick_up=field_dict['pick_up']454			)455			product.clean()456			product.save()457			json_data = {458				"pk": product.pk,459				"name": product.name,460				"description": product.description,461				"category_id": product.category_id.pk,462				"price": product.price,463				"owner_id": product.owner_id.pk,464				"pick_up": product.pick_up,465				"time_posted": product.time_posted,466				"time_updated": product.time_updated467			}468		except ValidationError as e:469			status = False470			json_data = e.message_dict471		return JsonResponse(json_encode_dict_and_status(json_data, status))472	def delete(self, request, pk=None):473		status = False474		obj = get_object_or_none(self.model, pk=pk)475		if obj is not None:476			obj.delete()477			status = True478		return JsonResponse(json_encode_dict_and_status({},status))479class TransactionView(View):480	required_fields = ['seller', 'buyer', 'product_id']481	model = Transaction482	@method_decorator(csrf_exempt)483	def dispatch(self, request, *args, **kwargs):484		return super(TransactionView, self).dispatch(request, *args, **kwargs)485	def get(self, request, pk=None):486		status = True487		if pk is not None:488			queryset = get_object_or_none(489				self.model,490				pk=pk491			)492			if queryset is not None:493				json_data = {494					"pk": queryset.pk,495					"seller": queryset.seller.pk,496					"buyer": queryset.buyer.pk,497					"product_id": queryset.product_id.pk498				}499			else:500				status = False501				json_data = {}502		else:503			queryset = self.model.objects.all()504			json_data = list(queryset.values('pk', 'seller', 'buyer', 'product_id'))505		return JsonResponse(json_encode_dict_and_status(json_data, status))506	def post(self, request):507		status = True508		json_data = json.loads(request.body.decode('utf-8'))509		field_dict = retrieve_all_fields(510			json_data,511			self.required_fields512		)513		try:514			seller_id = None515			buyer_id = None516			product_id = None517			try:518				seller_id = User.objects.get(pk=field_dict['seller'])519				buyer_id = User.objects.get(pk=field_dict['buyer'])520				product_id = Product.objects.get(pk=field_dict['product_id'])521			except User.DoesNotExist:522				raise ValidationError({523					"Error":"Invalid seller id"524					})525			transaction = self.model(526				seller=seller_id,527				buyer=buyer_id,528				product_id=product_id529			)530			transaction.clean()531			transaction.save()532			json_data = {533				"seller": transaction.seller.pk,534				"buyer": transaction.buyer.pk,535				"product_id": transaction.product_id.pk536			}537		except ValidationError as e:538			status = False539			json_data = e.message_dict540		return JsonResponse(json_encode_dict_and_status(json_data, status))541	def delete(self, request, pk=None):542		status = False543		obj = get_object_or_none(self.model, pk=pk)544		if obj is not None:545			obj.delete()546			status = True...interface_rest.py
Source:interface_rest.py  
1#!/usr/bin/env python32# Copyright (c) 2014-2017 The Bitcoin Core developers3# Distributed under the MIT software license, see the accompanying4# file COPYING or http://www.opensource.org/licenses/mit-license.php.5"""Test the REST API."""6from test_framework.test_framework import BitcoinTestFramework7from test_framework.util import *8from struct import *9from io import BytesIO10from codecs import encode11import http.client12import urllib.parse13def deser_uint256(f):14    r = 015    for i in range(8):16        t = unpack(b"<I", f.read(4))[0]17        r += t << (i * 32)18    return r19#allows simple http get calls20def http_get_call(host, port, path, response_object = 0):21    conn = http.client.HTTPConnection(host, port)22    conn.request('GET', path)23    if response_object:24        return conn.getresponse()25    return conn.getresponse().read().decode('utf-8')26#allows simple http post calls with a request body27def http_post_call(host, port, path, requestdata = '', response_object = 0):28    conn = http.client.HTTPConnection(host, port)29    conn.request('POST', path, requestdata)30    if response_object:31        return conn.getresponse()32    return conn.getresponse().read()33class RESTTest (BitcoinTestFramework):34    FORMAT_SEPARATOR = "."35    def set_test_params(self):36        self.setup_clean_chain = True37        self.num_nodes = 338    def setup_network(self, split=False):39        super().setup_network()40        connect_nodes_bi(self.nodes, 0, 2)41    def run_test(self):42        url = urllib.parse.urlparse(self.nodes[0].url)43        self.log.info("Mining blocks...")44        self.nodes[0].generate(1)45        self.sync_all()46        self.nodes[2].generate(100)47        self.sync_all()48        assert_equal(self.nodes[0].getbalance(), 50)49        txid = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.1)50        self.sync_all()51        self.nodes[2].generate(1)52        self.sync_all()53        bb_hash = self.nodes[0].getbestblockhash()54        assert_equal(self.nodes[1].getbalance(), Decimal("0.1")) #balance now should be 0.1 on node 155        # load the latest 0.1 tx over the REST API56        json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+txid+self.FORMAT_SEPARATOR+"json")57        json_obj = json.loads(json_string)58        vintx = json_obj['vin'][0]['txid'] # get the vin to later check for utxo (should be spent by then)59        # get n of 0.1 outpoint60        n = 061        for vout in json_obj['vout']:62            if vout['value'] == 0.1:63                n = vout['n']64        #######################################65        # GETUTXOS: query an unspent outpoint #66        #######################################67        json_request = '/checkmempool/'+txid+'-'+str(n)68        json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')69        json_obj = json.loads(json_string)70        #check chainTip response71        assert_equal(json_obj['chaintipHash'], bb_hash)72        #make sure there is one utxo73        assert_equal(len(json_obj['utxos']), 1)74        assert_equal(json_obj['utxos'][0]['value'], 0.1)75        #################################################76        # GETUTXOS: now query an already spent outpoint #77        #################################################78        json_request = '/checkmempool/'+vintx+'-0'79        json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')80        json_obj = json.loads(json_string)81        #check chainTip response82        assert_equal(json_obj['chaintipHash'], bb_hash)83        #make sure there is no utox in the response because this oupoint has been spent84        assert_equal(len(json_obj['utxos']), 0)85        #check bitmap86        assert_equal(json_obj['bitmap'], "0")87        ##################################################88        # GETUTXOS: now check both with the same request #89        ##################################################90        json_request = '/checkmempool/'+txid+'-'+str(n)+'/'+vintx+'-0'91        json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')92        json_obj = json.loads(json_string)93        assert_equal(len(json_obj['utxos']), 1)94        assert_equal(json_obj['bitmap'], "10")95        #test binary response96        bb_hash = self.nodes[0].getbestblockhash()97        binaryRequest = b'\x01\x02'98        binaryRequest += hex_str_to_bytes(txid)99        binaryRequest += pack("i", n)100        binaryRequest += hex_str_to_bytes(vintx)101        binaryRequest += pack("i", 0)102        bin_response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'bin', binaryRequest)103        output = BytesIO()104        output.write(bin_response)105        output.seek(0)106        chainHeight = unpack("i", output.read(4))[0]107        hashFromBinResponse = hex(deser_uint256(output))[2:].zfill(64)108        assert_equal(bb_hash, hashFromBinResponse) #check if getutxo's chaintip during calculation was fine109        assert_equal(chainHeight, 102) #chain height must be 102110        ############################111        # GETUTXOS: mempool checks #112        ############################113        # do a tx and don't sync114        txid = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.1)115        json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+txid+self.FORMAT_SEPARATOR+"json")116        json_obj = json.loads(json_string)117        vintx = json_obj['vin'][0]['txid'] # get the vin to later check for utxo (should be spent by then)118        # get n of 0.1 outpoint119        n = 0120        for vout in json_obj['vout']:121            if vout['value'] == 0.1:122                n = vout['n']123        json_request = '/'+txid+'-'+str(n)124        json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')125        json_obj = json.loads(json_string)126        assert_equal(len(json_obj['utxos']), 0) #there should be an outpoint because it has just added to the mempool127        json_request = '/checkmempool/'+txid+'-'+str(n)128        json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')129        json_obj = json.loads(json_string)130        assert_equal(len(json_obj['utxos']), 1) #there should be an outpoint because it has just added to the mempool131        #do some invalid requests132        json_request = '{"checkmempool'133        response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'json', json_request, True)134        assert_equal(response.status, 400) #must be a 400 because we send an invalid json request135        json_request = '{"checkmempool'136        response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'bin', json_request, True)137        assert_equal(response.status, 400) #must be a 400 because we send an invalid bin request138        response = http_post_call(url.hostname, url.port, '/rest/getutxos/checkmempool'+self.FORMAT_SEPARATOR+'bin', '', True)139        assert_equal(response.status, 400) #must be a 400 because we send an invalid bin request140        #test limits141        json_request = '/checkmempool/'142        for x in range(0, 20):143            json_request += txid+'-'+str(n)+'/'144        json_request = json_request.rstrip("/")145        response = http_post_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json', '', True)146        assert_equal(response.status, 400) #must be a 400 because we exceeding the limits147        json_request = '/checkmempool/'148        for x in range(0, 15):149            json_request += txid+'-'+str(n)+'/'150        json_request = json_request.rstrip("/")151        response = http_post_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json', '', True)152        assert_equal(response.status, 200) #must be a 200 because we are within the limits153        self.nodes[0].generate(1) #generate block to not affect upcoming tests154        self.sync_all()155        ################156        # /rest/block/ #157        ################158        # check binary format159        response = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+"bin", True)160        assert_equal(response.status, 200)161        assert_greater_than(int(response.getheader('content-length')), 80)162        response_str = response.read()163        # compare with block header164        response_header = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"bin", True)165        assert_equal(response_header.status, 200)166        assert_equal(int(response_header.getheader('content-length')), 80)167        response_header_str = response_header.read()168        assert_equal(response_str[0:80], response_header_str)169        # check block hex format170        response_hex = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+"hex", True)171        assert_equal(response_hex.status, 200)172        assert_greater_than(int(response_hex.getheader('content-length')), 160)173        response_hex_str = response_hex.read()174        assert_equal(encode(response_str, "hex_codec")[0:160], response_hex_str[0:160])175        # compare with hex block header176        response_header_hex = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"hex", True)177        assert_equal(response_header_hex.status, 200)178        assert_greater_than(int(response_header_hex.getheader('content-length')), 160)179        response_header_hex_str = response_header_hex.read()180        assert_equal(response_hex_str[0:160], response_header_hex_str[0:160])181        assert_equal(encode(response_header_str, "hex_codec")[0:160], response_header_hex_str[0:160])182        # check json format183        block_json_string = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+'json')184        block_json_obj = json.loads(block_json_string)185        assert_equal(block_json_obj['hash'], bb_hash)186        # compare with json block header187        response_header_json = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"json", True)188        assert_equal(response_header_json.status, 200)189        response_header_json_str = response_header_json.read().decode('utf-8')190        json_obj = json.loads(response_header_json_str, parse_float=Decimal)191        assert_equal(len(json_obj), 1) #ensure that there is one header in the json response192        assert_equal(json_obj[0]['hash'], bb_hash) #request/response hash should be the same193        #compare with normal RPC block response194        rpc_block_json = self.nodes[0].getblock(bb_hash)195        assert_equal(json_obj[0]['hash'],               rpc_block_json['hash'])196        assert_equal(json_obj[0]['confirmations'],      rpc_block_json['confirmations'])197        assert_equal(json_obj[0]['height'],             rpc_block_json['height'])198        assert_equal(json_obj[0]['version'],            rpc_block_json['version'])199        assert_equal(json_obj[0]['merkleroot'],         rpc_block_json['merkleroot'])200        assert_equal(json_obj[0]['time'],               rpc_block_json['time'])201        assert_equal(json_obj[0]['nonce'],              rpc_block_json['nonce'])202        assert_equal(json_obj[0]['bits'],               rpc_block_json['bits'])203        assert_equal(json_obj[0]['difficulty'],         rpc_block_json['difficulty'])204        assert_equal(json_obj[0]['chainwork'],          rpc_block_json['chainwork'])205        assert_equal(json_obj[0]['previousblockhash'],  rpc_block_json['previousblockhash'])206        #see if we can get 5 headers in one response207        self.nodes[1].generate(5)208        self.sync_all()209        response_header_json = http_get_call(url.hostname, url.port, '/rest/headers/5/'+bb_hash+self.FORMAT_SEPARATOR+"json", True)210        assert_equal(response_header_json.status, 200)211        response_header_json_str = response_header_json.read().decode('utf-8')212        json_obj = json.loads(response_header_json_str)213        assert_equal(len(json_obj), 5) #now we should have 5 header objects214        # do tx test215        tx_hash = block_json_obj['tx'][0]['txid']216        json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+tx_hash+self.FORMAT_SEPARATOR+"json")217        json_obj = json.loads(json_string)218        assert_equal(json_obj['txid'], tx_hash)219        # check hex format response220        hex_string = http_get_call(url.hostname, url.port, '/rest/tx/'+tx_hash+self.FORMAT_SEPARATOR+"hex", True)221        assert_equal(hex_string.status, 200)222        assert_greater_than(int(response.getheader('content-length')), 10)223        # check block tx details224        # let's make 3 tx and mine them on node 1225        txs = []226        txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))227        txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))228        txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))229        self.sync_all()230        # check that there are exactly 3 transactions in the TX memory pool before generating the block231        json_string = http_get_call(url.hostname, url.port, '/rest/mempool/info'+self.FORMAT_SEPARATOR+'json')232        json_obj = json.loads(json_string)233        assert_equal(json_obj['size'], 3)234        # the size of the memory pool should be greater than 3x ~100 bytes235        assert_greater_than(json_obj['bytes'], 300)236        # check that there are our submitted transactions in the TX memory pool237        json_string = http_get_call(url.hostname, url.port, '/rest/mempool/contents'+self.FORMAT_SEPARATOR+'json')238        json_obj = json.loads(json_string)239        for tx in txs:240            assert_equal(tx in json_obj, True)241        # now mine the transactions242        newblockhash = self.nodes[1].generate(1)243        self.sync_all()244        #check if the 3 tx show up in the new block245        json_string = http_get_call(url.hostname, url.port, '/rest/block/'+newblockhash[0]+self.FORMAT_SEPARATOR+'json')246        json_obj = json.loads(json_string)247        for tx in json_obj['tx']:248            if not 'coinbase' in tx['vin'][0]: #exclude coinbase249                assert_equal(tx['txid'] in txs, True)250        #check the same but without tx details251        json_string = http_get_call(url.hostname, url.port, '/rest/block/notxdetails/'+newblockhash[0]+self.FORMAT_SEPARATOR+'json')252        json_obj = json.loads(json_string)253        for tx in txs:254            assert_equal(tx in json_obj['tx'], True)255        #test rest bestblock256        bb_hash = self.nodes[0].getbestblockhash()257        json_string = http_get_call(url.hostname, url.port, '/rest/chaininfo.json')258        json_obj = json.loads(json_string)259        assert_equal(json_obj['bestblockhash'], bb_hash)260if __name__ == '__main__':...rest.py
Source:rest.py  
1#!/usr/bin/env python32# Copyright (c) 2014-2016 The Bitcoin Core developers3# Distributed under the MIT software license, see the accompanying4# file COPYING or http://www.opensource.org/licenses/mit-license.php.5"""Test the REST API."""6from test_framework.test_framework import BitcoinTestFramework7from test_framework.util import *8from struct import *9from io import BytesIO10from codecs import encode11import http.client12import urllib.parse13def deser_uint256(f):14    r = 015    for i in range(8):16        t = unpack(b"<I", f.read(4))[0]17        r += t << (i * 32)18    return r19#allows simple http get calls20def http_get_call(host, port, path, response_object = 0):21    conn = http.client.HTTPConnection(host, port)22    conn.request('GET', path)23    if response_object:24        return conn.getresponse()25    return conn.getresponse().read().decode('utf-8')26#allows simple http post calls with a request body27def http_post_call(host, port, path, requestdata = '', response_object = 0):28    conn = http.client.HTTPConnection(host, port)29    conn.request('POST', path, requestdata)30    if response_object:31        return conn.getresponse()32    return conn.getresponse().read()33class RESTTest (BitcoinTestFramework):34    FORMAT_SEPARATOR = "."35    def set_test_params(self):36        self.setup_clean_chain = True37        self.num_nodes = 338    def setup_network(self, split=False):39        super().setup_network()40        connect_nodes_bi(self.nodes, 0, 2)41    def run_test(self):42        url = urllib.parse.urlparse(self.nodes[0].url)43        self.log.info("Mining blocks...")44        self.nodes[0].generate(1)45        self.sync_all()46        self.nodes[2].generate(100)47        self.sync_all()48        assert_equal(self.nodes[0].getbalance(), 50)49        txid = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.1)50        self.sync_all()51        self.nodes[2].generate(1)52        self.sync_all()53        bb_hash = self.nodes[0].getbestblockhash()54        assert_equal(self.nodes[1].getbalance(), Decimal("0.1")) #balance now should be 0.1 on node 155        # load the latest 0.1 tx over the REST API56        json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+txid+self.FORMAT_SEPARATOR+"json")57        json_obj = json.loads(json_string)58        vintx = json_obj['vin'][0]['txid'] # get the vin to later check for utxo (should be spent by then)59        # get n of 0.1 outpoint60        n = 061        for vout in json_obj['vout']:62            if vout['value'] == 0.1:63                n = vout['n']64        #######################################65        # GETUTXOS: query an unspent outpoint #66        #######################################67        json_request = '/checkmempool/'+txid+'-'+str(n)68        json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')69        json_obj = json.loads(json_string)70        #check chainTip response71        assert_equal(json_obj['chaintipHash'], bb_hash)72        #make sure there is one utxo73        assert_equal(len(json_obj['utxos']), 1)74        assert_equal(json_obj['utxos'][0]['value'], 0.1)75        #################################################76        # GETUTXOS: now query an already spent outpoint #77        #################################################78        json_request = '/checkmempool/'+vintx+'-0'79        json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')80        json_obj = json.loads(json_string)81        #check chainTip response82        assert_equal(json_obj['chaintipHash'], bb_hash)83        #make sure there is no utox in the response because this oupoint has been spent84        assert_equal(len(json_obj['utxos']), 0)85        #check bitmap86        assert_equal(json_obj['bitmap'], "0")87        ##################################################88        # GETUTXOS: now check both with the same request #89        ##################################################90        json_request = '/checkmempool/'+txid+'-'+str(n)+'/'+vintx+'-0'91        json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')92        json_obj = json.loads(json_string)93        assert_equal(len(json_obj['utxos']), 1)94        assert_equal(json_obj['bitmap'], "10")95        #test binary response96        bb_hash = self.nodes[0].getbestblockhash()97        binaryRequest = b'\x01\x02'98        binaryRequest += hex_str_to_bytes(txid)99        binaryRequest += pack("i", n)100        binaryRequest += hex_str_to_bytes(vintx)101        binaryRequest += pack("i", 0)102        bin_response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'bin', binaryRequest)103        output = BytesIO()104        output.write(bin_response)105        output.seek(0)106        chainHeight = unpack("i", output.read(4))[0]107        hashFromBinResponse = hex(deser_uint256(output))[2:].zfill(64)108        assert_equal(bb_hash, hashFromBinResponse) #check if getutxo's chaintip during calculation was fine109        assert_equal(chainHeight, 102) #chain height must be 102110        ############################111        # GETUTXOS: mempool checks #112        ############################113        # do a tx and don't sync114        txid = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.1)115        json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+txid+self.FORMAT_SEPARATOR+"json")116        json_obj = json.loads(json_string)117        vintx = json_obj['vin'][0]['txid'] # get the vin to later check for utxo (should be spent by then)118        # get n of 0.1 outpoint119        n = 0120        for vout in json_obj['vout']:121            if vout['value'] == 0.1:122                n = vout['n']123        json_request = '/'+txid+'-'+str(n)124        json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')125        json_obj = json.loads(json_string)126        assert_equal(len(json_obj['utxos']), 0) #there should be an outpoint because it has just added to the mempool127        json_request = '/checkmempool/'+txid+'-'+str(n)128        json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')129        json_obj = json.loads(json_string)130        assert_equal(len(json_obj['utxos']), 1) #there should be an outpoint because it has just added to the mempool131        #do some invalid requests132        json_request = '{"checkmempool'133        response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'json', json_request, True)134        assert_equal(response.status, 400) #must be a 400 because we send an invalid json request135        json_request = '{"checkmempool'136        response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'bin', json_request, True)137        assert_equal(response.status, 400) #must be a 400 because we send an invalid bin request138        response = http_post_call(url.hostname, url.port, '/rest/getutxos/checkmempool'+self.FORMAT_SEPARATOR+'bin', '', True)139        assert_equal(response.status, 400) #must be a 400 because we send an invalid bin request140        #test limits141        json_request = '/checkmempool/'142        for x in range(0, 20):143            json_request += txid+'-'+str(n)+'/'144        json_request = json_request.rstrip("/")145        response = http_post_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json', '', True)146        assert_equal(response.status, 400) #must be a 400 because we exceeding the limits147        json_request = '/checkmempool/'148        for x in range(0, 15):149            json_request += txid+'-'+str(n)+'/'150        json_request = json_request.rstrip("/")151        response = http_post_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json', '', True)152        assert_equal(response.status, 200) #must be a 200 because we are within the limits153        self.nodes[0].generate(1) #generate block to not affect upcoming tests154        self.sync_all()155        ################156        # /rest/block/ #157        ################158        # check binary format159        response = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+"bin", True)160        assert_equal(response.status, 200)161        assert_greater_than(int(response.getheader('content-length')), 80)162        response_str = response.read()163        # compare with block header164        response_header = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"bin", True)165        assert_equal(response_header.status, 200)166        assert_equal(int(response_header.getheader('content-length')), 80)167        response_header_str = response_header.read()168        assert_equal(response_str[0:80], response_header_str)169        # check block hex format170        response_hex = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+"hex", True)171        assert_equal(response_hex.status, 200)172        assert_greater_than(int(response_hex.getheader('content-length')), 160)173        response_hex_str = response_hex.read()174        assert_equal(encode(response_str, "hex_codec")[0:160], response_hex_str[0:160])175        # compare with hex block header176        response_header_hex = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"hex", True)177        assert_equal(response_header_hex.status, 200)178        assert_greater_than(int(response_header_hex.getheader('content-length')), 160)179        response_header_hex_str = response_header_hex.read()180        assert_equal(response_hex_str[0:160], response_header_hex_str[0:160])181        assert_equal(encode(response_header_str, "hex_codec")[0:160], response_header_hex_str[0:160])182        # check json format183        block_json_string = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+'json')184        block_json_obj = json.loads(block_json_string)185        assert_equal(block_json_obj['hash'], bb_hash)186        # compare with json block header187        response_header_json = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"json", True)188        assert_equal(response_header_json.status, 200)189        response_header_json_str = response_header_json.read().decode('utf-8')190        json_obj = json.loads(response_header_json_str, parse_float=Decimal)191        assert_equal(len(json_obj), 1) #ensure that there is one header in the json response192        assert_equal(json_obj[0]['hash'], bb_hash) #request/response hash should be the same193        #compare with normal RPC block response194        rpc_block_json = self.nodes[0].getblock(bb_hash)195        assert_equal(json_obj[0]['hash'],               rpc_block_json['hash'])196        assert_equal(json_obj[0]['confirmations'],      rpc_block_json['confirmations'])197        assert_equal(json_obj[0]['height'],             rpc_block_json['height'])198        assert_equal(json_obj[0]['version'],            rpc_block_json['version'])199        assert_equal(json_obj[0]['merkleroot'],         rpc_block_json['merkleroot'])200        assert_equal(json_obj[0]['time'],               rpc_block_json['time'])201        assert_equal(json_obj[0]['nonce'],              rpc_block_json['nonce'])202        assert_equal(json_obj[0]['bits'],               rpc_block_json['bits'])203        assert_equal(json_obj[0]['difficulty'],         rpc_block_json['difficulty'])204        assert_equal(json_obj[0]['chainwork'],          rpc_block_json['chainwork'])205        assert_equal(json_obj[0]['previousblockhash'],  rpc_block_json['previousblockhash'])206        #see if we can get 5 headers in one response207        self.nodes[1].generate(5)208        self.sync_all()209        response_header_json = http_get_call(url.hostname, url.port, '/rest/headers/5/'+bb_hash+self.FORMAT_SEPARATOR+"json", True)210        assert_equal(response_header_json.status, 200)211        response_header_json_str = response_header_json.read().decode('utf-8')212        json_obj = json.loads(response_header_json_str)213        assert_equal(len(json_obj), 5) #now we should have 5 header objects214        # do tx test215        tx_hash = block_json_obj['tx'][0]['txid']216        json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+tx_hash+self.FORMAT_SEPARATOR+"json")217        json_obj = json.loads(json_string)218        assert_equal(json_obj['txid'], tx_hash)219        # check hex format response220        hex_string = http_get_call(url.hostname, url.port, '/rest/tx/'+tx_hash+self.FORMAT_SEPARATOR+"hex", True)221        assert_equal(hex_string.status, 200)222        assert_greater_than(int(response.getheader('content-length')), 10)223        # check block tx details224        # let's make 3 tx and mine them on node 1225        txs = []226        txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))227        txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))228        txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))229        self.sync_all()230        # check that there are exactly 3 transactions in the TX memory pool before generating the block231        json_string = http_get_call(url.hostname, url.port, '/rest/mempool/info'+self.FORMAT_SEPARATOR+'json')232        json_obj = json.loads(json_string)233        assert_equal(json_obj['size'], 3)234        # the size of the memory pool should be greater than 3x ~100 bytes235        assert_greater_than(json_obj['bytes'], 300)236        # check that there are our submitted transactions in the TX memory pool237        json_string = http_get_call(url.hostname, url.port, '/rest/mempool/contents'+self.FORMAT_SEPARATOR+'json')238        json_obj = json.loads(json_string)239        for tx in txs:240            assert_equal(tx in json_obj, True)241        # now mine the transactions242        newblockhash = self.nodes[1].generate(1)243        self.sync_all()244        #check if the 3 tx show up in the new block245        json_string = http_get_call(url.hostname, url.port, '/rest/block/'+newblockhash[0]+self.FORMAT_SEPARATOR+'json')246        json_obj = json.loads(json_string)247        for tx in json_obj['tx']:248            if not 'coinbase' in tx['vin'][0]: #exclude coinbase249                assert_equal(tx['txid'] in txs, True)250        #check the same but without tx details251        json_string = http_get_call(url.hostname, url.port, '/rest/block/notxdetails/'+newblockhash[0]+self.FORMAT_SEPARATOR+'json')252        json_obj = json.loads(json_string)253        for tx in txs:254            assert_equal(tx in json_obj['tx'], True)255        #test rest bestblock256        bb_hash = self.nodes[0].getbestblockhash()257        json_string = http_get_call(url.hostname, url.port, '/rest/chaininfo.json')258        json_obj = json.loads(json_string)259        assert_equal(json_obj['bestblockhash'], bb_hash)260if __name__ == '__main__':...test_unicode.py
Source:test_unicode.py  
1import sys2import codecs3from unittest import TestCase4import simplejson as json5from simplejson.compat import unichr, text_type, b, u, BytesIO6class TestUnicode(TestCase):7    def test_encoding1(self):8        encoder = json.JSONEncoder(encoding='utf-8')9        u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'10        s = u.encode('utf-8')11        ju = encoder.encode(u)12        js = encoder.encode(s)13        self.assertEqual(ju, js)14    def test_encoding2(self):15        u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'16        s = u.encode('utf-8')17        ju = json.dumps(u, encoding='utf-8')18        js = json.dumps(s, encoding='utf-8')19        self.assertEqual(ju, js)20    def test_encoding3(self):21        u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'22        j = json.dumps(u)23        self.assertEqual(j, '"\\u03b1\\u03a9"')24    def test_encoding4(self):25        u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'26        j = json.dumps([u])27        self.assertEqual(j, '["\\u03b1\\u03a9"]')28    def test_encoding5(self):29        u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'30        j = json.dumps(u, ensure_ascii=False)31        self.assertEqual(j, u'"' + u + u'"')32    def test_encoding6(self):33        u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'34        j = json.dumps([u], ensure_ascii=False)35        self.assertEqual(j, u'["' + u + u'"]')36    def test_big_unicode_encode(self):37        u = u'\U0001d120'38        self.assertEqual(json.dumps(u), '"\\ud834\\udd20"')39        self.assertEqual(json.dumps(u, ensure_ascii=False), u'"\U0001d120"')40    def test_big_unicode_decode(self):41        u = u'z\U0001d120x'42        self.assertEqual(json.loads('"' + u + '"'), u)43        self.assertEqual(json.loads('"z\\ud834\\udd20x"'), u)44    def test_unicode_decode(self):45        for i in range(0, 0xd7ff):46            u = unichr(i)47            #s = '"\\u{0:04x}"'.format(i)48            s = '"\\u%04x"' % (i,)49            self.assertEqual(json.loads(s), u)50    def test_object_pairs_hook_with_unicode(self):51        s = u'{"xkd":1, "kcw":2, "art":3, "hxm":4, "qrt":5, "pad":6, "hoy":7}'52        p = [(u"xkd", 1), (u"kcw", 2), (u"art", 3), (u"hxm", 4),53             (u"qrt", 5), (u"pad", 6), (u"hoy", 7)]54        self.assertEqual(json.loads(s), eval(s))55        self.assertEqual(json.loads(s, object_pairs_hook=lambda x: x), p)56        od = json.loads(s, object_pairs_hook=json.OrderedDict)57        self.assertEqual(od, json.OrderedDict(p))58        self.assertEqual(type(od), json.OrderedDict)59        # the object_pairs_hook takes priority over the object_hook60        self.assertEqual(json.loads(s,61                                    object_pairs_hook=json.OrderedDict,62                                    object_hook=lambda x: None),63                         json.OrderedDict(p))64    def test_default_encoding(self):65        self.assertEqual(json.loads(u'{"a": "\xe9"}'.encode('utf-8')),66            {'a': u'\xe9'})67    def test_unicode_preservation(self):68        self.assertEqual(type(json.loads(u'""')), text_type)69        self.assertEqual(type(json.loads(u'"a"')), text_type)70        self.assertEqual(type(json.loads(u'["a"]')[0]), text_type)71    def test_ensure_ascii_false_returns_unicode(self):72        # http://code.google.com/p/simplejson/issues/detail?id=4873        self.assertEqual(type(json.dumps([], ensure_ascii=False)), text_type)74        self.assertEqual(type(json.dumps(0, ensure_ascii=False)), text_type)75        self.assertEqual(type(json.dumps({}, ensure_ascii=False)), text_type)76        self.assertEqual(type(json.dumps("", ensure_ascii=False)), text_type)77    def test_ensure_ascii_false_bytestring_encoding(self):78        # http://code.google.com/p/simplejson/issues/detail?id=4879        doc1 = {u'quux': b('Arr\xc3\xaat sur images')}80        doc2 = {u'quux': u('Arr\xeat sur images')}81        doc_ascii = '{"quux": "Arr\\u00eat sur images"}'82        doc_unicode = u'{"quux": "Arr\xeat sur images"}'83        self.assertEqual(json.dumps(doc1), doc_ascii)84        self.assertEqual(json.dumps(doc2), doc_ascii)85        self.assertEqual(json.dumps(doc1, ensure_ascii=False), doc_unicode)86        self.assertEqual(json.dumps(doc2, ensure_ascii=False), doc_unicode)87    def test_ensure_ascii_linebreak_encoding(self):88        # http://timelessrepo.com/json-isnt-a-javascript-subset89        s1 = u'\u2029\u2028'90        s2 = s1.encode('utf8')91        expect = '"\\u2029\\u2028"'92        self.assertEqual(json.dumps(s1), expect)93        self.assertEqual(json.dumps(s2), expect)94        self.assertEqual(json.dumps(s1, ensure_ascii=False), expect)95        self.assertEqual(json.dumps(s2, ensure_ascii=False), expect)96    def test_invalid_escape_sequences(self):97        # incomplete escape sequence98        self.assertRaises(json.JSONDecodeError, json.loads, '"\\u')99        self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1')100        self.assertRaises(json.JSONDecodeError, json.loads, '"\\u12')101        self.assertRaises(json.JSONDecodeError, json.loads, '"\\u123')102        self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1234')103        # invalid escape sequence104        self.assertRaises(json.JSONDecodeError, json.loads, '"\\u123x"')105        self.assertRaises(json.JSONDecodeError, json.loads, '"\\u12x4"')106        self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1x34"')107        self.assertRaises(json.JSONDecodeError, json.loads, '"\\ux234"')108        if sys.maxunicode > 65535:109            # invalid escape sequence for low surrogate110            self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u"')111            self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u0"')112            self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u00"')113            self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u000"')114            self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u000x"')115            self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u00x0"')116            self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u0x00"')117            self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\ux000"')118    def test_ensure_ascii_still_works(self):119        # in the ascii range, ensure that everything is the same120        for c in map(unichr, range(0, 127)):121            self.assertEqual(122                json.dumps(c, ensure_ascii=False),123                json.dumps(c))124        snowman = u'\N{SNOWMAN}'125        self.assertEqual(126            json.dumps(c, ensure_ascii=False),127            '"' + c + '"')128    def test_strip_bom(self):129        content = u"\u3053\u3093\u306b\u3061\u308f"130        json_doc = codecs.BOM_UTF8 + b(json.dumps(content))131        self.assertEqual(json.load(BytesIO(json_doc)), content)132        for doc in json_doc, json_doc.decode('utf8'):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
