Best Python code snippet using lisa_python
key_exchange.py
Source:key_exchange.py  
...115        self._rem_pub_key: Optional[int] = None116        self._priv_key = None117        self._pub_key = None118        self._dh_group = None119    def _create_key_pair(self):120        self._dh_group = dh.DHParameterNumbers(self._pval, self._gval)121        if self._recorder.is_injecting():122            x = self._recorder.inject(x_val=None)123            self._priv_key = load_der_private_key(x, None)124            self._pub_key = self._priv_key.public_key()125        else:126            self._priv_key = self._dh_group.parameters().generate_private_key()127            self._pub_key = self._priv_key.public_key()128            x = self._priv_key.private_bytes(129                Encoding.DER, PrivateFormat.PKCS8, NoEncryption()130            )131            self._recorder.trace(x_val=x)132    def get_shared_secret(self) -> bytes:133        """Get the shared key, i.e., the premaster secret.134        Returns:135            the premaster secret136        """137        if self._priv_key is None:138            self._create_key_pair()139            assert self._priv_key140        rem_pub_numbers = dh.DHPublicNumbers(self._rem_pub_key, self._dh_group)141        rem_pub_key = rem_pub_numbers.public_key()142        return self._priv_key.exchange(rem_pub_key).lstrip(b"\0")143    def set_remote_key(144        self,145        rem_pub_key: bytes,146        g_val: Optional[int] = None,147        p_val: Optional[bytes] = None,148        group: Optional[tls.SupportedGroups] = None,149    ) -> None:150        """Sets the remote public key151        Arguments:152            rem_pub_key: the raw remote public key153            g_val: the generator value154            p_val: the p-value as a byte string155            group: the supported group156        """157        if group is not None:158            dh_nbrs = dh_numbers.dh_numbers.get(group)159            if dh_nbrs is None:160                raise tls.ServerMalfunction(tls.ServerIssue.FFDH_GROUP_UNKNOWN)161            p_val = dh_nbrs.p_val162            g_val = dh_nbrs.g_val163        assert p_val164        self._pval = int.from_bytes(p_val, "big")165        self._gval = g_val166        self._rem_pub_key = int.from_bytes(rem_pub_key, "big")167    def get_transferable_key(self) -> bytes:168        """Get the raw bytes of the key to be sent to the peer.169        Returns:170            the key to be sent to the peer.171        """172        if self._pub_key is None:173            self._create_key_pair()174            assert self._pub_key175        y_val = self._pub_key.public_numbers().y176        return y_val.to_bytes(int(self._pub_key.key_size / 8), "big")177    def get_key_share(self) -> bytes:178        """TLS1.3 alias for get_transferable_key.179        """180        return self.get_transferable_key()181    def set_params(182        self, group_name: Optional[tls.SupportedGroups] = None, **kwargs: Any,183    ) -> None:184        """Sets the group name185        """186        if group_name:187            dh_nbrs = dh_numbers.dh_numbers.get(group_name)188            if dh_nbrs is None:189                raise ValueError(f"No numbers defined for DH-group {group_name.name}")190            self._gval = dh_nbrs.g_val191            self._pval = int.from_bytes(dh_nbrs.p_val, "big")192class EcdhKeyExchange(KeyExchange):193    """Implement an ECDHE key exchange194    """195    def __init__(self, *args: Any, **kwargs: Any) -> None:196        self._algo: Optional[Type[ec.EllipticCurve]] = kwargs.get("algo")197        self._priv_key: Optional[ec.EllipticCurvePrivateKey] = None198        self._pub_key: Optional[bytes] = None199        self._rem_pub_key: Optional[bytes] = None200        super().__init__(*args, **kwargs)201    def _create_key_pair(self) -> None:202        seed = int.from_bytes(os.urandom(10), "big")203        seed = self._recorder.inject(ec_seed=seed)204        assert self._algo205        self._priv_key = ec.derive_private_key(seed, self._algo())206        pub_key = self._priv_key.public_key()207        self._pub_key = pub_key.public_bytes(208            Encoding.X962, PublicFormat.UncompressedPoint209        )210    def get_shared_secret(self) -> bytes:211        """Get the shared key, i.e., the premaster secret.212        Returns:213            the premaster secret214        """215        if self._priv_key is None:216            self._create_key_pair()217        assert self._algo and self._rem_pub_key and self._priv_key218        rem_pub_key = ec.EllipticCurvePublicKey.from_encoded_point(219            self._algo(), bytes(self._rem_pub_key)220        )221        return self._priv_key.exchange(ec.ECDH(), rem_pub_key)222    def set_remote_key(self, rem_pub_key: bytes, **kwargs: Any) -> None:223        """Sets the remote public key224        Arguments:225            rem_pub_key: the raw remote public key226            **kwargs: unused227        """228        self._rem_pub_key = rem_pub_key229    def get_transferable_key(self) -> bytes:230        """Get the raw bytes of the key to be sent to the peer.231        Returns:232            the key to be sent to the peer.233        """234        if self._pub_key is None:235            self._create_key_pair()236            assert self._pub_key237        return self._pub_key238    def get_key_share(self) -> bytes:239        """TLS1.3 alias for get_transferable_key.240        """241        if self._pub_key is None:242            self._create_key_pair()243            assert self._pub_key244        return self._pub_key245    def set_params(246        self,247        group_name: Optional[tls.SupportedGroups] = None,248        algo: Optional[Type[ec.EllipticCurve]] = None,249        **kwargs: Any,250    ) -> None:251        """Sets the versions and the remote public key252        """253        if group_name:254            self._group_name = group_name255        if algo:256            self._algo = algo257class EcdhKeyExchangeCertificate(KeyExchange):258    """Implement the key exchange for ECDHE.259    """260    def set_remote_key(self, rem_pub_key: bytes, **kwargs: Any) -> None:261        """Sets the remote public key262        Arguments:263            rem_pub_key: the raw remote public key264            **kwargs: unused265        """266        pass267    def get_transferable_key(self) -> bytes:268        """Get the raw bytes of the key to be sent to the peer.269        Returns:270            the key to be sent to the peer.271        """272        return self._pub_key273    def get_shared_secret(self) -> bytes:274        """Get the shared key, i.e., the premaster secret.275        Returns:276            the premaster secret277        """278        return self._shared_secret279    def set_params(self, rem_public_key: Optional[Any] = None, **kwargs: Any,) -> None:280        """Sets the remote public key281        """282        if rem_public_key:283            seed = self._recorder.inject(ec_seed=int.from_bytes(os.urandom(10), "big"))284            priv_key = ec.derive_private_key(seed, rem_public_key.curve)285            pub_key = priv_key.public_key()286            self._pub_key: bytes = pub_key.public_bytes(287                Encoding.X962, PublicFormat.UncompressedPoint288            )289            self._shared_secret: bytes = priv_key.exchange(ec.ECDH(), rem_public_key)290class XKeyExchange(KeyExchange):291    """Implement the key exchange for X25519 and X448.292    """293    def __init__(self, *args: Any, **kwargs: Any) -> None:294        super().__init__(*args, **kwargs)295        self._priv_key = None296        self._pub_key = None297        self._rem_pub_key: Optional[bytes] = None298        self._private_key_lib: Union[299            Type[x25519.X25519PrivateKey], Type[x448.X448PrivateKey]300        ]301        self._public_key_lib: Union[302            Type[x25519.X25519PublicKey], Type[x448.X448PublicKey]303        ]304    def _create_key_pair(self):305        if self._recorder.is_injecting():306            priv_bytes = self._recorder.inject(private_key=None)307            self._priv_key = self._private_key_lib.from_private_bytes(priv_bytes)308        else:309            self._priv_key = self._private_key_lib.generate()310            if self._recorder.is_recording():311                priv_bytes = self._priv_key.private_bytes(312                    encoding=Encoding.Raw,313                    format=PrivateFormat.Raw,314                    encryption_algorithm=NoEncryption(),315                )316                self._recorder.trace(private_key=priv_bytes)317        pub_key = self._priv_key.public_key()318        self._pub_key = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw)319    def get_shared_secret(self) -> bytes:320        """Get the shared key, i.e., the premaster secret.321        Returns:322            the premaster secret323        """324        if self._priv_key is None:325            self._create_key_pair()326            assert self._priv_key327        rem_pub_key = self._public_key_lib.from_public_bytes(bytes(self._rem_pub_key))328        return self._priv_key.exchange(rem_pub_key)329    def set_remote_key(self, rem_pub_key: bytes, **kwargs: Any) -> None:330        """Sets the remote public key331        Arguments:332            rem_pub_key: the raw remote public key333            **kwargs: unused334        """335        self._rem_pub_key = rem_pub_key336    def get_transferable_key(self) -> bytes:337        """Get the raw bytes of the key to be sent to the peer.338        Returns:339            the key to be sent to the peer.340        """341        if self._priv_key is None:342            self._create_key_pair()343            assert self._pub_key344        return self._pub_key345    def get_key_share(self) -> bytes:346        """TLS1.3 alias for get_transferable_key.347        """348        return self.get_transferable_key()349    def set_params(350        self, group_name: Optional[tls.SupportedGroups] = None, **kwargs: Any,351    ) -> None:352        """Sets the group name353        """354        if group_name is tls.SupportedGroups.X25519:355            self._private_key_lib = x25519.X25519PrivateKey356            self._public_key_lib = x25519.X25519PublicKey...key_store.py
Source:key_store.py  
...10class NoKeyPairs(Exception):11    pass12class KeyStore(object):13    @staticmethod14    def _create_key_pair(seed=None):15        if seed is not None:16            if len(seed) > 32:17                raise Exception('max seed length is 32')18            seed = seed.encode('utf-8')19        kwargs = {}20        if seed is not None:21            kwargs['entropy'] = lambda x: seed.ljust(32)22        return ed25519.create_keypair(**kwargs)23    def create_key_pair(self, seed=None):24        raise NotImplementedError25    def sign(self, data, public_key=None):26        raise NotImplementedError27    def get_only_public_key(self):28        raise NotImplementedError29class InMemoryKeyStore(KeyStore):30    def __init__(self):31        self._key_pairs = {}32    def create_key_pair(self, seed=None):33        (secret_key, public_key) = self._create_key_pair(seed)34        encoded = b58.b58encode(public_key.to_bytes()).decode('utf-8')35        self._key_pairs[encoded] = secret_key36        return encoded37    def sign(self, data, public_key=None):38        if public_key is None:39            public_key = self.get_only_public_key()40        secret_key = self._key_pairs[public_key]41        return secret_key.sign(data)42    def get_only_public_key(self):43        if len(self._key_pairs) > 1:44            raise AmbiguousPublicKey45        elif len(self._key_pairs) == 0:46            raise NoKeyPairs47        return list(self._key_pairs.keys())[0]48class FileKeyStore(KeyStore):49    def __init__(self, path):50        self._path = path51    def create_key_pair(self, seed=None):52        if not os.path.exists(self._path):53            os.makedirs(self._path)54        (secret_key, public_key) = self._create_key_pair(seed)55        encoded_pub = b58.b58encode(public_key.to_bytes()).decode('utf-8')56        encoded_secret = b58.b58encode(secret_key.to_bytes()).decode('utf-8')57        with open(os.path.join(self._path, encoded_pub), 'w') as f:58            key_file = {59                'public_key': encoded_pub,60                'secret_key': encoded_secret,61            }62            f.write(json.dumps(key_file))63        return encoded_pub64    def sign(self, data, public_key=None):65        if public_key is None:66            public_key = self.get_only_public_key()67        with open(os.path.join(self._path, public_key)) as f:68            key_file = json.loads(f.read())...test-create-unsigned-csr.py
Source:test-create-unsigned-csr.py  
1#!/usr/bin/python32import unittest3from OpenSSL import crypto4class TestCryptography(unittest.TestCase):5    def _create_key_pair(self, type, bits):6        key_pair = crypto.PKey()7        key_pair.generate_key(type, bits)8        return key_pair9    def test_create_csr_that_has_not_been_signed(self):10        """Generate a CSR that has not been signed."""11        key_pair = self._create_key_pair(crypto.TYPE_RSA, 2048)12        csr = crypto.X509Req()13        subject = csr.get_subject()14        setattr(subject, "CN", "host.example.net")15        csr.set_pubkey(key_pair)16        pem = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)17        print(f"pem={pem}")18        return pem19if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
