Best Python code snippet using hypothesis
test_addon_behaviour.py
Source:test_addon_behaviour.py  
...46        })47        resp = r.json()48        self.assertEqual(expected_status, resp.get('status'), f'response was: {r.text}')49        return resp.get('data')50    def do_validate(self, token: str, subclient='', expected_status='success') -> dict:51        """Validates the given auth token.52        Returns the validation response, which is a dict like this: {53            "status": "success",54            "token_expires": "Wed, 07 Nov 2018 10:59:14 GMT",55            "user": {56                "email": "sybren@stuvel.eu",57                "full_name": "dr. Sybren\u2122 <script>alert('hacked via BlenderID!')</script>",58                "id": 189659            }60        }61        """62        expected_code = {63            'success': 200,64            'fail': 403,65        }[expected_status]66        payload = {'token': token}67        if subclient:68            payload['subclient_id'] = subclient69        r = self.post('u/validate_token', data=payload, expected_status=expected_code)70        # print(f'Validation returned: {r.text}')71        resp = r.json()72        self.assertEqual(expected_status, resp.get('status'), f'response was: {r.text}')73        return resp74    def do_logout(self, user_id: str, token: str, expected_status='success'):75        """Remotely deletes the token.76        :param expected_status: 'success' or 'fail'77        """78        r = self.post('u/delete_token', data={79            'user_id': user_id,80            'token': token,81        })82        resp = r.json()83        self.assertEqual(expected_status, resp.get('status'), f'response was: {r.text}')84    def do_create_subclient_token(self, token: str, subclient=SUBCLIENT,85                                  expected_status='success') -> dict:86        """Creates a subclient token, returns the response data.87        Returns a dict like: {88            "expires": "Wed, 07 Nov 2018 11:26:45 GMT",89            "token": "b8TWUS5qFl0z2hBTfqxfUtF1eLG8Dz"90        }91        """92        r = self.post('subclients/create_token',93                      data={'subclient_id': subclient,94                            'host_label': 'some integration test'},95                      token=token,96                      expected_status=201)97        # print(f'Subclient token creation: {r.text}')98        resp = r.json()99        self.assertEqual(expected_status, resp['status'], f'response: {r.text}')100        return resp['data']101    def do_revoke_subclient_token(self, user_id: str, token: str, subclient=SUBCLIENT,102                                  expected_status='success'):103        """Revokes a subclient token."""104        expected_code = {105            'success': 200,106            'fail': 401,107        }[expected_status]108        r = self.post('subclients/revoke_token',109                      data={'client_id': 'unknown',110                            'user_id': user_id,111                            'token': token,112                            'subclient_id': subclient},113                      token=token,114                      expected_status=expected_code)115        # print(f'Subclient token revocation: {r.text}')116        if expected_status == 'success':117            resp = r.json()118            self.assertEqual(expected_status, resp['status'], f'response: {r.text}')119    def test_identify(self):120        ident_data = self.do_identify()121        self.assertTrue(ident_data['user_id'], 'Expected a non-zero user ID')122        print()123        input(f"Please check that your user ID is {ident_data['user_id']} and press ENTER")124        # Now we should be able to use this access token.125        token = ident_data['oauth_token']['access_token']126        self.do_validate(token)127        # Expiration date should be in the future.128        now = datetime.datetime.now(tz=pytz.utc)129        expires = dateutil.parser.parse(ident_data['oauth_token']['expires'])130        self.assertGreater(expires, now)131    def test_logout(self):132        ident_data = self.do_identify()133        token = ident_data['oauth_token']['access_token']134        self.do_validate(token)135        # After logging out, validation should fail.136        self.do_logout(ident_data['user_id'], token)137        self.do_validate(token, expected_status='fail')138    def test_subclient_token(self):139        ident_data = self.do_identify()140        token = ident_data['oauth_token']['access_token']141        subtoken_info = self.do_create_subclient_token(token)142        # Expiration date should be in the future.143        now = datetime.datetime.now(tz=pytz.utc)144        expires = dateutil.parser.parse(subtoken_info['expires'])145        self.assertGreater(expires, now)146        # We shouldn't be able to use it directly as token.147        self.do_validate(subtoken_info['token'], expected_status='fail')148        # But as a subclient token it should be fine.149        self.do_validate(subtoken_info['token'], subclient=SUBCLIENT)150    def test_revoke_subclient_token(self):151        ident_data = self.do_identify()152        token = ident_data['oauth_token']['access_token']153        subtoken_info = self.do_create_subclient_token(token)154        self.do_validate(subtoken_info['token'], subclient=SUBCLIENT)155        # After revoking, it shouldn't work any more.156        self.do_revoke_subclient_token(ident_data['user_id'], subtoken_info['token'])157        self.do_validate(subtoken_info['token'], subclient=SUBCLIENT,158                         expected_status='fail')159        # Double-revoking shouldn't work either.160        self.do_revoke_subclient_token(ident_data['user_id'], subtoken_info['token'],161                                       expected_status='fail')162    def test_send_token_to_subclient(self):163        ident_data = self.do_identify()164        token = ident_data['oauth_token']['access_token']165        subtoken_info = self.do_create_subclient_token(token)166        self.do_validate(subtoken_info['token'], subclient=SUBCLIENT)167        r = self.post(urljoin(self.cloud_api, 'blender_id/store_scst'),168                      data={'user_id': ident_data['user_id'],169                            'subclient_id': SUBCLIENT,170                            'token': subtoken_info['token']})171        r.raise_for_status()172        # Let's check that the subclient token works now.173        self.get(urljoin(self.cloud_api, 'users/me'),...monkey.py
Source:monkey.py  
1from typing import Optional, Any2from abc import ABC, abstractmethod3import grpc4from cosmpy.aerial.client import LedgerClient5from cosmpy.aerial.contract import LedgerContract, _compute_digest6from cosmpy.aerial.tx_helpers import SubmittedTx7from cosmpy.aerial.wallet import Wallet8from cosmpy.crypto.address import Address9from cosmpy.protos.cosmwasm.wasm.v1.query_pb2 import QueryCodeRequest10from jsonschema import ValidationError, validate as validate_schema11from jenesis.contracts import Contract12class InstantiateArgsError(RuntimeError):13    pass14class ContractObserver(ABC):15    @abstractmethod16    def on_code_id_update(self, code_id: int):17        pass18    @abstractmethod19    def on_contract_address_update(self, address: Address):20        pass21def validate(args: Any, schema: dict):22    try:23        validate_schema(args, schema)24    except ValidationError as ex:25        print("Contract message failed validation. To send to ledger anyway, retry with 'do_validate=False'")26        raise ex27class MonkeyContract(LedgerContract):28    def __init__(29        self,30        contract: Contract,31        client: LedgerClient,32        address: Optional[Address] = None,33        digest: Optional[bytes] = None,34        code_id: Optional[int] = None,35        observer: Optional[ContractObserver] = None,36        init_args: Optional[dict] = None,37    ):38        # pylint: disable=super-init-not-called39        self._contract = contract40        self._path = contract.binary_path41        self._client = client42        self._address = address43        self._observer = observer44        self._init_args = init_args45        # build the digest46        if contract.binary_path is not None:47            self._digest = _compute_digest(contract.binary_path)48        elif digest is not None:49            self._digest = digest50        else:51            raise RuntimeError('Unable to determine contract digest')52        # look up the code id if this is the first time53        if code_id is None:54            self._code_id = self._find_contract_id_by_digest(self._digest)55        elif code_id <= 0:56            self._code_id = 057        else:58            self._code_id = self._find_contract_id_by_digest_with_hint(code_id)59        # if code id is not found, store this as code_id = 0 so we don't keep looking for it60        if self._code_id is None:61            self._code_id = 062        # trigger the observer if necessary63        if self._observer is not None and self._code_id is not None:64            self._observer.on_code_id_update(self._code_id)65        # add methods based on schema66        if contract.schema is not None:67            self._add_queries()68            self._add_executions()69    def store(70            self,71            sender: Wallet,72            gas_limit: Optional[int] = None,73            memo: Optional[str] = None,74    ) -> int:75        code_id = super().store(sender, gas_limit=gas_limit, memo=memo)76        # trigger the observer if necessary77        if self._observer is not None:78            self._observer.on_code_id_update(code_id)79        return code_id80    def instantiate(81            self,82            code_id: int,83            args: Any,84            sender: Wallet,85            label: Optional[str] = None,86            gas_limit: Optional[int] = None,87            admin_address: Optional[Address] = None,88            funds: Optional[str] = None,89            do_validate: Optional[bool] = True,90    ) -> Address:91        # if no args provided, insert init args from configuration92        if args is None:93            if self._init_args is not None:94                args = self._init_args95            else:96                raise InstantiateArgsError(97                    'Please provide instantiation arguments either in "args" or in the jenesis.toml configuration for this contract and profile'98                )99        if do_validate and self._contract.instantiate_schema:100            validate(args, self._contract.instantiate_schema)101        address = super().instantiate(code_id, args, sender, label=label, gas_limit=gas_limit,102                                      admin_address=admin_address, funds=funds)103        if self._observer is not None:104            self._observer.on_contract_address_update(address)105        return address106    def deploy(107        self,108        args: Any,109        sender: Wallet,110        label: Optional[str] = None,111        store_gas_limit: Optional[int] = None,112        instantiate_gas_limit: Optional[int] = None,113        admin_address: Optional[Address] = None,114        funds: Optional[str] = None,115        do_validate: Optional[bool] = True,116    ) -> Address:117        # in the case where the contract is already deployed118        if self._address is not None and self._code_id is not None:119            return self._address120        assert self._address is None121        if self._code_id is None or self._code_id <= 0:122            self.store(sender, gas_limit=store_gas_limit)123        assert self._code_id124        code_id = self.store(sender, gas_limit=store_gas_limit)125        address = self.instantiate(126            code_id,127            args,128            sender,129            label=label,130            gas_limit=instantiate_gas_limit,131            admin_address=admin_address,132            funds=funds,133            do_validate=do_validate,134        )135        return address136    def execute(137        self,138        args: Any,139        sender: Wallet,140        gas_limit: Optional[int] = None,141        funds: Optional[str] = None,142        do_validate: Optional[bool] = True,143    ) -> SubmittedTx:144        if do_validate and self._contract.execute_schema:145            validate(args, self._contract.execute_schema)146        return super().execute(args, sender, gas_limit, funds)147    def query(self, args: Any, do_validate: Optional[bool] = True) -> Any:148        if do_validate and self._contract.query_schema:149            validate(args, self._contract.query_schema)150        return super().query(args)151    def _find_contract_id_by_digest_with_hint(self, code_id_hint: int) -> Optional[int]:152        # try and lookup the specified code id153        try:154            req = QueryCodeRequest(code_id=code_id_hint)155            resp = self._client.wasm.Code(req)156            if resp.code_info.data_hash == self.digest:157                return int(resp.code_info.code_id)158        except (grpc.RpcError, RuntimeError) as ex:159            not_found = False160            # pylint: disable=no-member161            if hasattr(ex, 'details') and 'not found' in ex.details() or 'not found' in str(ex):162                not_found = True163            if not not_found:164                raise165        return self._find_contract_id_by_digest(self._digest)166    def _add_queries(self):167        def make_query(query_msg):168            def query(*args, **kwargs):169                query_args = {query_msg: kwargs}170                return self.query(query_args, *args)171            return query172        for query_msg in self._contract.query_msgs():173            if getattr(self, query_msg, None) is None:174                setattr(self, query_msg, make_query(query_msg))175    def _add_executions(self):176        def make_execution(execute_msg):177            def execute(*args, **kwargs):178                execute_args = {execute_msg: kwargs}179                return self.execute(execute_args, *args)180            return execute181        for execute_msg in self._contract.execute_msgs():182            if getattr(self, execute_msg, None) is None:183                setattr(self, execute_msg, make_execution(execute_msg))184    def __repr__(self):...wof-validate-properties
Source:wof-validate-properties  
...31        vld_args['exporter'] = exporter32    vld = mapzen.whosonfirst.validator.validator(**vld_args)33    # note the scope so that we keep a pointer to34    # vld and exporter (20150922/thisisaaronland)35    def do_validate(path):36        rptr = vld.validate_file(path)37        if rptr.ok():38            logging.debug("%s is OKAY in as much as any of us are really okay..." % path)39        else:40            logging.error("%s FAILED validation" % path)41    def process_args(args):42        for path in args:43            44            path = os.path.abspath(path)45            logging.debug("process %s" % path)46            if os.path.isfile(path):47                do_validate(path)48            elif os.path.isdir(path):49                crawl = mapzen.whosonfirst.utils.crawl(path)50                if not options.multi:51                    for p in crawl:52                        do_validate(p)53                        54                else:55                    def signal_handler(signal, frame):56                        sys.exit(0)57        58                    signal.signal(signal.SIGINT, signal_handler)59                    processes = multiprocessing.cpu_count() * 260                    pool = multiprocessing.Pool(processes=processes)61            62                    batch = []63                    batch_size = 1000064                    65                    for path in crawl:66                        batch.append(path)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
