Best Python code snippet using molotov_python
manager.py
Source:manager.py  
1"""Classes to manage connections."""2import logging3from typing import Coroutine, Sequence, Tuple4from aries_cloudagent.protocols.coordinate_mediation.v1_0.manager import (5    MediationManager,6)7from ....cache.base import BaseCache8from ....config.base import InjectionError9from ....connections.base_manager import BaseConnectionManager10from ....connections.models.conn_record import ConnRecord11from ....connections.models.connection_target import ConnectionTarget12from ....connections.util import mediation_record_if_id13from ....core.error import BaseError14from ....core.profile import ProfileSession15from ....messaging.responder import BaseResponder16from ....protocols.routing.v1_0.manager import RoutingManager17from ....storage.error import StorageError, StorageNotFoundError18from ....transport.inbound.receipt import MessageReceipt19from ....wallet.base import BaseWallet, DIDInfo20from ....wallet.crypto import create_keypair, seed_to_did21from ....wallet.error import WalletNotFoundError22from ....wallet.util import bytes_to_b5823from ....multitenant.manager import MultitenantManager24from ...coordinate_mediation.v1_0.models.mediation_record import MediationRecord25from .messages.connection_invitation import ConnectionInvitation26from .messages.connection_request import ConnectionRequest27from .messages.connection_response import ConnectionResponse28from .messages.problem_report import ProblemReportReason29from .models.connection_detail import ConnectionDetail30class ConnectionManagerError(BaseError):31    """Connection error."""32class ConnectionManager(BaseConnectionManager):33    """Class for managing connections."""34    def __init__(self, session: ProfileSession):35        """36        Initialize a ConnectionManager.37        Args:38            session: The profile session for this connection manager39        """40        self._session = session41        self._logger = logging.getLogger(__name__)42        super().__init__(self._session)43    @property44    def session(self) -> ProfileSession:45        """46        Accessor for the current profile session.47        Returns:48            The profile session for this connection manager49        """50        return self._session51    async def create_invitation(52            self,53            my_label: str = None,54            my_endpoint: str = None,55            auto_accept: bool = None,56            public: bool = False,57            multi_use: bool = False,58            alias: str = None,59            routing_keys: Sequence[str] = None,60            recipient_keys: Sequence[str] = None,61            metadata: dict = None,62            mediation_id: str = None,63            with_did: str = None,64    ) -> Tuple[ConnRecord, ConnectionInvitation]:65        """66        Generate new connection invitation.67        This interaction represents an out-of-band communication channel. In the future68        and in practice, these sort of invitations will be received over any number of69        channels such as SMS, Email, QR Code, NFC, etc.70        Structure of an invite message:71        ::72            {73                "@type": "https://didcomm.org/connections/1.0/invitation",74                "label": "Alice",75                "did": "did:bid:QmWbsNYhMrjHiqZDTUTEJs"76            }77        Or, in the case of a peer DID:78        ::79            {80                "@type": "https://didcomm.org/connections/1.0/invitation",81                "label": "Alice",82                "did": "did:peer:oiSqsNYhMrjHiqZDTUthsw",83                "recipient_keys": ["8HH5gYEeNc3z7PYXmd54d4x6qAfCNrqQqEB3nS7Zfu7K"],84                "service_endpoint": "https://example.com/endpoint"85                "routing_keys": ["9EH5gYEeNc3z7PYXmd53d5x6qAfCNrqQqEB4nS7Zfu6K"],86            }87        Args:88            my_label: label for this connection89            my_endpoint: endpoint where other party can reach me90            auto_accept: auto-accept a corresponding connection request91                (None to use config)92            public: set to create an invitation from the public DID93            multi_use: set to True to create an invitation for multiple use94            alias: optional alias to apply to connection for later use95        Returns:96            A tuple of the new `ConnRecord` and `ConnectionInvitation` instances97        """98        # Mediation Record can still be None after this operation if no99        # mediation id passed and no default100        mediation_record = await mediation_record_if_id(101            self._session,102            mediation_id,103            or_default=True,104        )105        keylist_updates = None106        image_url = self._session.context.settings.get("image_url")107        # Multitenancy setup108        multitenant_mgr = self._session.inject(MultitenantManager, required=False)109        wallet_id = self._session.settings.get("wallet.id")110        if not my_label:111            my_label = self._session.settings.get("default_label")112        wallet = self._session.inject(BaseWallet)113        if public:114            if not self._session.settings.get("public_invites"):115                raise ConnectionManagerError("Public invitations are not enabled")116            public_did = await wallet.get_public_did()117            if not public_did:118                raise ConnectionManagerError(119                    "Cannot create public invitation with no public DID"120                )121            if multi_use:122                raise ConnectionManagerError(123                    "Cannot use public and multi_use at the same time"124                )125            if metadata:126                raise ConnectionManagerError(127                    "Cannot use public and set metadata at the same time"128                )129            # FIXME - allow ledger instance to format public DID with prefix?130            invitation = ConnectionInvitation(131                label=my_label, did=f"did:bid:{public_did.did}", image_url=image_url132            )133            # Add mapping for multitenant relaying.134            # Mediation of public keys is not supported yet135            if multitenant_mgr and wallet_id:136                await multitenant_mgr.add_key(137                    wallet_id, public_did.verkey, skip_if_exists=True138                )139            return None, invitation140        invitation_mode = ConnRecord.INVITATION_MODE_ONCE141        if multi_use:142            invitation_mode = ConnRecord.INVITATION_MODE_MULTI143        if recipient_keys:144            # TODO: register recipient keys for relay145            # TODO: check that recipient keys are in wallet146            invitation_key = recipient_keys[0]  # TODO first key appropriate?147        else:148            # Create and store new invitation key149            invitation_signing_key = await wallet.create_signing_key()150            invitation_key = invitation_signing_key.verkey151            recipient_keys = [invitation_key]152            mediation_mgr = MediationManager(self._session.profile)153            keylist_updates = await mediation_mgr.add_key(154                invitation_key, keylist_updates155            )156            if multitenant_mgr and wallet_id:157                await multitenant_mgr.add_key(wallet_id, invitation_key)158        accept = (159            ConnRecord.ACCEPT_AUTO160            if (161                    auto_accept162                    or (163                            auto_accept is None164                            and self._session.settings.get("debug.auto_accept_requests")165                    )166            )167            else ConnRecord.ACCEPT_MANUAL168        )169        # Create connection record170        connection = ConnRecord(171            invitation_key=invitation_key,  # TODO: determine correct key to use172            their_role=ConnRecord.Role.REQUESTER.rfc160,173            state=ConnRecord.State.INVITATION.rfc160,174            accept=accept,175            invitation_mode=invitation_mode,176            alias=alias,177            with_did=with_did,178        )179        await connection.save(self._session, reason="Created new invitation")180        routing_keys = []181        my_endpoint = my_endpoint or self._session.settings.get("default_endpoint")182        # The base wallet can act as a mediator for all tenants183        if multitenant_mgr and wallet_id:184            base_mediation_record = await multitenant_mgr.get_default_mediator()185            if base_mediation_record:186                routing_keys = base_mediation_record.routing_keys187                my_endpoint = base_mediation_record.endpoint188                # If we use a mediator for the base wallet we don't189                # need to register the key at the subwallet mediator190                # because it only needs to know the key of the base mediator191                # sub wallet mediator -> base wallet mediator -> agent192                keylist_updates = None193        if mediation_record:194            routing_keys = [*routing_keys, *mediation_record.routing_keys]195            my_endpoint = mediation_record.endpoint196            # Save that this invitation was created with mediation197            await connection.metadata_set(198                self._session, "mediation", {"id": mediation_id}199            )200            if keylist_updates:201                responder = self._session.inject(BaseResponder, required=False)202                await responder.send(203                    keylist_updates, connection_id=mediation_record.connection_id204                )205        # Create connection invitation message206        # Note: Need to split this into two stages to support inbound routing of invites207        # Would want to reuse create_did_document and convert the result208        invitation = ConnectionInvitation(209            label=my_label,210            recipient_keys=recipient_keys,211            routing_keys=routing_keys,212            endpoint=my_endpoint,213            image_url=image_url,214        )215        await connection.attach_invitation(self._session, invitation)216        if metadata:217            for key, value in metadata.items():218                await connection.metadata_set(self._session, key, value)219        return connection, invitation220    async def receive_invitation(221            self,222            invitation: ConnectionInvitation,223            their_public_did: str = None,224            auto_accept: bool = None,225            alias: str = None,226            mediation_id: str = None,227            mediation_record: MediationRecord = None,228            with_did: str = None,229    ) -> ConnRecord:230        """231        Create a new connection record to track a received invitation.232        Args:233            invitation: The `ConnectionInvitation` to store234            auto_accept: set to auto-accept the invitation (None to use config)235            alias: optional alias to set on the record236        Returns:237            The new `ConnRecord` instance238        """239        if not invitation.did:240            if not invitation.recipient_keys:241                raise ConnectionManagerError("Invitation must contain recipient key(s)")242            if not invitation.endpoint:243                raise ConnectionManagerError("Invitation must contain an endpoint")244        accept = (245            ConnRecord.ACCEPT_AUTO246            if (247                    auto_accept248                    or (249                            auto_accept is None250                            and self._session.settings.get("debug.auto_accept_invites")251                    )252            )253            else ConnRecord.ACCEPT_MANUAL254        )255        # Create connection record256        connection = ConnRecord(257            invitation_key=invitation.recipient_keys and invitation.recipient_keys[0],258            their_label=invitation.label,259            their_role=ConnRecord.Role.RESPONDER.rfc160,260            state=ConnRecord.State.INVITATION.rfc160,261            accept=accept,262            alias=alias,263            their_public_did=their_public_did,264            with_did=with_did,265        )266        await connection.save(267            self._session,268            reason="Created new connection record from invitation",269            log_params={"invitation": invitation, "their_label": invitation.label},270        )271        # Save the invitation for later processing272        await connection.attach_invitation(self._session, invitation)273        if connection.accept == ConnRecord.ACCEPT_AUTO:274            request = await self.create_request(connection, mediation_id=mediation_id)275            responder = self._session.inject(BaseResponder, required=False)276            if responder:277                await responder.send(request, connection_id=connection.connection_id)278                # refetch connection for accurate state279                connection = await ConnRecord.retrieve_by_id(280                    self._session, connection.connection_id281                )282        else:283            self._logger.debug("Connection invitation will await acceptance")284        return connection285    async def create_request(286            self,287            connection: ConnRecord,288            my_label: str = None,289            my_endpoint: str = None,290            mediation_id: str = None,291    ) -> ConnectionRequest:292        """293        Create a new connection request for a previously-received invitation.294        Args:295            connection: The `ConnRecord` representing the invitation to accept296            my_label: My label297            my_endpoint: My endpoint298        Returns:299            A new `ConnectionRequest` message to send to the other agent300        """301        keylist_updates = None302        # Mediation Record can still be None after this operation if no303        # mediation id passed and no default304        mediation_record = await mediation_record_if_id(305            self._session,306            mediation_id,307            or_default=True,308        )309        multitenant_mgr = self._session.inject(MultitenantManager, required=False)310        wallet_id = self._session.settings.get("wallet.id")311        base_mediation_record = None312        if multitenant_mgr and wallet_id:313            base_mediation_record = await multitenant_mgr.get_default_mediator()314        my_info = None315        wallet = self._session.inject(BaseWallet)316        if connection.my_did:317            my_info = await wallet.get_local_did(connection.my_did)318        elif connection.with_did:319            my_info = await wallet.get_local_did(connection.with_did)320            connection.my_did = my_info.did321        else:322            # Create new DID for connection323            my_info = await wallet.create_local_did()324            connection.my_did = my_info.did325            mediation_mgr = MediationManager(self._session.profile)326            keylist_updates = await mediation_mgr.add_key(327                my_info.verkey, keylist_updates328            )329            # Add mapping for multitenant relay330            if multitenant_mgr and wallet_id:331                await multitenant_mgr.add_key(wallet_id, my_info.verkey)332        # Create connection request message333        if my_endpoint:334            my_endpoints = [my_endpoint]335        else:336            my_endpoints = []337            default_endpoint = self._session.settings.get("default_endpoint")338            if default_endpoint:339                my_endpoints.append(default_endpoint)340            my_endpoints.extend(self._session.settings.get("additional_endpoints", []))341        did_doc = await self.create_did_document(342            my_info,343            connection.inbound_connection_id,344            my_endpoints,345            mediation_records=list(346                filter(None, [base_mediation_record, mediation_record])347            ),348        )349        if not my_label:350            my_label = self._session.settings.get("default_label")351        request = ConnectionRequest(352            label=my_label,353            connection=ConnectionDetail(did=connection.my_did, did_doc=did_doc),354            image_url=self._session.settings.get("image_url"),355        )356        # Update connection state357        connection.request_id = request._id358        connection.state = ConnRecord.State.REQUEST.rfc160359        await connection.save(self._session, reason="Created connection request")360        # Notify mediator of keylist changes361        if keylist_updates and mediation_record:362            # send a update keylist message with new recipient keys.363            responder = self._session.inject(BaseResponder, required=False)364            await responder.send(365                keylist_updates, connection_id=mediation_record.connection_id366            )367        return request368    async def receive_request(369            self,370            request: ConnectionRequest,371            receipt: MessageReceipt,372            mediation_id: str = None,373    ) -> ConnRecord:374        """375        Receive and store a connection request.376        Args:377            request: The `ConnectionRequest` to accept378            receipt: The message receipt379        Returns:380            The new or updated `ConnRecord` instance381        """382        ConnRecord.log_state(383            "Receiving connection request",384            {"request": request},385            settings=self._session.settings,386        )387        mediation_mgr = MediationManager(self._session.profile)388        keylist_updates = None389        connection = None390        connection_key = None391        my_info = None392        # Multitenancy setup393        multitenant_mgr = self._session.inject(MultitenantManager, required=False)394        wallet_id = self._session.settings.get("wallet.id")395        wallet = self._session.inject(BaseWallet)396        # Determine what key will need to sign the response397        if receipt.recipient_did_public:398            my_info = await wallet.get_local_did(receipt.recipient_did)399            connection_key = my_info.verkey400        else:401            connection_key = receipt.recipient_verkey402            try:403                connection = await ConnRecord.retrieve_by_invitation_key(404                    session=self._session,405                    invitation_key=connection_key,406                    their_role=ConnRecord.Role.REQUESTER.rfc160,407                )408            except StorageNotFoundError:409                raise ConnectionManagerError(410                    "No invitation found for pairwise connection "411                    f"in state {ConnRecord.State.INVITATION.rfc160}: "412                    "a prior connection request may have updated the connection state"413                )414        invitation = None415        if connection:416            invitation = await connection.retrieve_invitation(self._session)417            connection_key = connection.invitation_key418            ConnRecord.log_state(419                "Found invitation",420                {"invitation": invitation},421                settings=self._session.settings,422            )423            if connection.is_multiuse_invitation:424                wallet = self._session.inject(BaseWallet)425                if connection.with_did is not None:426                    my_info = await wallet.get_local_did(connection.with_did)427                    conn = await self.find_connection(request.connection.did, my_info.did)428                    if conn is not None:429                        await conn.delete_record(self._session)430                else:431                    my_info = await wallet.create_local_did()432                keylist_updates = await mediation_mgr.add_key(433                    my_info.verkey, keylist_updates434                )435                new_connection = ConnRecord(436                    invitation_key=connection_key,437                    my_did=my_info.did,438                    state=ConnRecord.State.INVITATION.rfc160,439                    accept=connection.accept,440                    their_role=connection.their_role,441                )442                await new_connection.save(443                    self._session,444                    reason="Received connection request from multi-use invitation DID",445                )446                # Transfer metadata from multi-use to new connection447                # Must come after save so there's an ID to associate with metadata448                for key, value in (449                        await connection.metadata_get_all(self._session)450                ).items():451                    await new_connection.metadata_set(self._session, key, value)452                connection = new_connection453                # Add mapping for multitenant relay454                if multitenant_mgr and wallet_id:455                    await multitenant_mgr.add_key(wallet_id, my_info.verkey)456            else:457                # remove key from mediator keylist458                keylist_updates = await mediation_mgr.remove_key(459                    connection_key, keylist_updates460                )461        conn_did_doc = request.connection.did_doc462        if not conn_did_doc:463            raise ConnectionManagerError(464                "No DIDDoc provided; cannot connect to public DID"465            )466        if request.connection.did != conn_did_doc.did:467            raise ConnectionManagerError(468                "Connection DID does not match DIDDoc id",469                error_code=ProblemReportReason.REQUEST_NOT_ACCEPTED,470            )471        await self.store_did_document(conn_did_doc)472        if connection:473            connection.their_label = request.label474            connection.their_did = request.connection.did475            connection.state = ConnRecord.State.REQUEST.rfc160476            await connection.save(477                self._session, reason="Received connection request from invitation"478            )479        elif not self._session.settings.get("public_invites"):480            raise ConnectionManagerError("Public invitations are not enabled")481        else:  # request from public did482            my_info = await wallet.create_local_did()483            # send update-keylist message with new recipient keys484            keylist_updates = await mediation_mgr.add_key(485                my_info.verkey, keylist_updates486            )487            # Add mapping for multitenant relay488            if multitenant_mgr and wallet_id:489                await multitenant_mgr.add_key(wallet_id, my_info.verkey)490            connection = ConnRecord(491                invitation_key=connection_key,492                my_did=my_info.did,493                their_role=ConnRecord.Role.RESPONDER.rfc160,494                their_did=request.connection.did,495                their_label=request.label,496                accept=(497                    ConnRecord.ACCEPT_AUTO498                    if self._session.settings.get("debug.auto_accept_requests")499                    else ConnRecord.ACCEPT_MANUAL500                ),501                state=ConnRecord.State.REQUEST.rfc160,502            )503            await connection.save(504                self._session, reason="Received connection request from public DID"505            )506        # Attach the connection request so it can be found and responded to507        await connection.attach_request(self._session, request)508        # Send keylist updates to mediator509        mediation_record = await mediation_record_if_id(self._session, mediation_id)510        if keylist_updates and mediation_record:511            responder = self._session.inject(BaseResponder, required=False)512            await responder.send(513                keylist_updates, connection_id=mediation_record.connection_id514            )515        if connection.accept == ConnRecord.ACCEPT_AUTO:516            response = await self.create_response(connection, mediation_id=mediation_id)517            responder = self._session.inject(BaseResponder, required=False)518            if responder:519                await responder.send_reply(520                    response, connection_id=connection.connection_id521                )522                # refetch connection for accurate state523                connection = await ConnRecord.retrieve_by_id(524                    self._session, connection.connection_id525                )526        else:527            self._logger.debug("Connection request will await acceptance")528        return connection529    async def create_response(530            self,531            connection: ConnRecord,532            my_endpoint: str = None,533            mediation_id: str = None,534    ) -> ConnectionResponse:535        """536        Create a connection response for a received connection request.537        Args:538            connection: The `ConnRecord` with a pending connection request539            my_endpoint: The endpoint I can be reached at540            mediation_id: The record id for mediation that contains routing_keys and541            service endpoint542        Returns:543            A tuple of the updated `ConnRecord` new `ConnectionResponse` message544        """545        ConnRecord.log_state(546            "Creating connection response",547            {"connection_id": connection.connection_id},548            settings=self._session.settings,549        )550        keylist_updates = None551        mediation_record = await mediation_record_if_id(self._session, mediation_id)552        # Multitenancy setup553        multitenant_mgr = self._session.inject(MultitenantManager, required=False)554        wallet_id = self._session.settings.get("wallet.id")555        base_mediation_record = None556        if multitenant_mgr and wallet_id:557            base_mediation_record = await multitenant_mgr.get_default_mediator()558        if ConnRecord.State.get(connection.state) not in (559                ConnRecord.State.REQUEST,560                ConnRecord.State.RESPONSE,561        ):562            raise ConnectionManagerError(563                "Connection is not in the request or response state"564            )565        request = await connection.retrieve_request(self._session)566        wallet = self._session.inject(BaseWallet)567        if connection.my_did:568            my_info = await wallet.get_local_did(connection.my_did)569        elif connection.with_did is not None:570            my_info = await wallet.get_local_did(connection.with_did)571            conn = await self.find_connection(connection.their_did, my_info.did)572            if conn is not None:573                await conn.delete_record(self._session)574        else:575            my_info = await wallet.create_local_did()576            connection.my_did = my_info.did577            mediation_mgr = MediationManager(self._session.profile)578            keylist_updates = await mediation_mgr.add_key(579                my_info.verkey, keylist_updates580            )581            # Add mapping for multitenant relay582            if multitenant_mgr and wallet_id:583                await multitenant_mgr.add_key(wallet_id, my_info.verkey)584        # Create connection response message585        if my_endpoint:586            my_endpoints = [my_endpoint]587        else:588            my_endpoints = []589            default_endpoint = self._session.settings.get("default_endpoint")590            if default_endpoint:591                my_endpoints.append(default_endpoint)592            my_endpoints.extend(self._session.settings.get("additional_endpoints", []))593        did_doc = await self.create_did_document(594            my_info,595            connection.inbound_connection_id,596            my_endpoints,597            mediation_records=list(598                filter(None, [base_mediation_record, mediation_record])599            ),600        )601        response = ConnectionResponse(602            connection=ConnectionDetail(did=my_info.did, did_doc=did_doc)603        )604        # Assign thread information605        response.assign_thread_from(request)606        response.assign_trace_from(request)607        # Sign connection field using the invitation key608        wallet = self._session.inject(BaseWallet)609        await response.sign_field("connection", connection.invitation_key, wallet)610        # Update connection state611        connection.state = ConnRecord.State.RESPONSE.rfc160612        await connection.save(613            self._session,614            reason="Created connection response",615            log_params={"response": response},616        )617        # Update mediator if necessary618        if keylist_updates and mediation_record:619            responder = self._session.inject(BaseResponder, required=False)620            await responder.send(621                keylist_updates, connection_id=mediation_record.connection_id622            )623        # TODO It's possible the mediation request sent here might arrive624        # before the connection response. This would result in an error condition625        # difficult to accomodate for without modifying handlers for trust ping626        # to ensure the connection is active.627        send_mediation_request = await connection.metadata_get(628            self._session, MediationManager.SEND_REQ_AFTER_CONNECTION629        )630        if send_mediation_request:631            mgr = MediationManager(self._session.profile)632            _record, request = await mgr.prepare_request(connection.connection_id)633            responder = self._session.inject(BaseResponder)634            await responder.send(request, connection_id=connection.connection_id)635        return response636    async def accept_response(637            self, response: ConnectionResponse, receipt: MessageReceipt638    ) -> ConnRecord:639        """640        Accept a connection response.641        Process a ConnectionResponse message by looking up642        the connection request and setting up the pairwise connection.643        Args:644            response: The `ConnectionResponse` to accept645            receipt: The message receipt646        Returns:647            The updated `ConnRecord` representing the connection648        Raises:649            ConnectionManagerError: If there is no DID associated with the650                connection response651            ConnectionManagerError: If the corresponding connection is not652                at the request or response stage653        """654        connection = None655        if response._thread:656            # identify the request by the thread ID657            try:658                connection = await ConnRecord.retrieve_by_request_id(659                    self._session, response._thread_id660                )661            except StorageNotFoundError:662                pass663        if not connection and receipt.sender_did:664            # identify connection by the DID they used for us665            try:666                connection = await ConnRecord.retrieve_by_did(667                    self._session, receipt.sender_did, receipt.recipient_did668                )669            except StorageNotFoundError:670                pass671        if not connection:672            raise ConnectionManagerError(673                "No corresponding connection request found",674                error_code=ProblemReportReason.RESPONSE_NOT_ACCEPTED,675            )676        if ConnRecord.State.get(connection.state) not in (677                ConnRecord.State.REQUEST,678                ConnRecord.State.RESPONSE,679        ):680            raise ConnectionManagerError(681                f"Cannot accept connection response for connection"682                f" in state: {connection.state}"683            )684        their_did = response.connection.did685        conn_did_doc = response.connection.did_doc686        if not conn_did_doc:687            raise ConnectionManagerError(688                "No DIDDoc provided; cannot connect to public DID"689            )690        if their_did != conn_did_doc.did:691            raise ConnectionManagerError("Connection DID does not match DIDDoc id")692        await self.store_did_document(conn_did_doc)693        connection.their_did = their_did694        connection.state = ConnRecord.State.RESPONSE.rfc160695        await connection.save(self._session, reason="Accepted connection response")696        send_mediation_request = await connection.metadata_get(697            self._session, MediationManager.SEND_REQ_AFTER_CONNECTION698        )699        if send_mediation_request:700            mgr = MediationManager(self._session.profile)701            _record, request = await mgr.prepare_request(connection.connection_id)702            responder = self._session.inject(BaseResponder)703            await responder.send(request, connection_id=connection.connection_id)704        return connection705    async def get_endpoints(self, conn_id: str) -> Tuple[str, str]:706        """707        Get connection endpoints.708        Args:709            conn_id: connection identifier710        Returns:711            Their endpoint for this connection712        """713        connection = await ConnRecord.retrieve_by_id(self._session, conn_id)714        wallet = self._session.inject(BaseWallet)715        my_did_info = await wallet.get_local_did(connection.my_did)716        my_endpoint = my_did_info.metadata.get(717            "endpoint",718            self._session.settings.get("default_endpoint"),719        )720        conn_targets = await self.get_connection_targets(721            connection_id=connection.connection_id,722            connection=connection,723        )724        return (my_endpoint, conn_targets[0].endpoint)725    async def create_static_connection(726            self,727            my_did: str = None,728            my_seed: str = None,729            their_did: str = None,730            their_seed: str = None,731            their_verkey: str = None,732            their_endpoint: str = None,733            their_label: str = None,734            alias: str = None,735    ) -> Tuple[DIDInfo, DIDInfo, ConnRecord]:736        """737        Register a new static connection (for use by the test suite).738        Args:739            my_did: override the DID used in the connection740            my_seed: provide a seed used to generate our DID and keys741            their_did: provide the DID used by the other party742            their_seed: provide a seed used to generate their DID and keys743            their_verkey: provide the verkey used by the other party744            their_endpoint: their URL endpoint for routing messages745            alias: an alias for this connection record746        Returns:747            Tuple: my DIDInfo, their DIDInfo, new `ConnRecord` instance748        """749        wallet = self._session.inject(BaseWallet)750        # Multitenancy setup751        multitenant_mgr = self._session.inject(MultitenantManager, required=False)752        wallet_id = self._session.settings.get("wallet.id")753        base_mediation_record = None754        # seed and DID optional755        my_info = await wallet.create_local_did(my_seed, my_did)756        # must provide their DID and verkey if the seed is not known757        if (not their_did or not their_verkey) and not their_seed:758            raise ConnectionManagerError(759                "Either a verkey or seed must be provided for the other party"760            )761        if not their_did:762            their_did = seed_to_did(their_seed)763        if not their_verkey:764            their_verkey_bin, _ = create_keypair(their_seed.encode())765            their_verkey = bytes_to_b58(their_verkey_bin)766        their_info = DIDInfo(their_did, their_verkey, {})767        # Create connection record768        connection = ConnRecord(769            invitation_mode=ConnRecord.INVITATION_MODE_STATIC,770            my_did=my_info.did,771            their_did=their_info.did,772            their_label=their_label,773            state=ConnRecord.State.COMPLETED.rfc160,774            alias=alias,775        )776        await connection.save(self._session, reason="Created new static connection")777        # Add mapping for multitenant relaying / mediation778        if multitenant_mgr and wallet_id:779            base_mediation_record = await multitenant_mgr.get_default_mediator()780            await multitenant_mgr.add_key(wallet_id, my_info.verkey)781        # Synthesize their DID doc782        did_doc = await self.create_did_document(783            their_info,784            None,785            [their_endpoint],786            mediation_records=[base_mediation_record]787            if base_mediation_record788            else None,789        )790        await self.store_did_document(did_doc)791        return my_info, their_info, connection792    async def find_connection(793            self,794            their_did: str,795            my_did: str = None,796            my_verkey: str = None,797            auto_complete=False,798    ) -> ConnRecord:799        """800        Look up existing connection information for a sender verkey.801        Args:802            their_did: Their DID803            my_did: My DID804            my_verkey: My verkey805            auto_complete: Should this connection automatically be promoted to active806        Returns:807            The located `ConnRecord`, if any808        """809        # self._log_state(810        #    "Finding connection",811        #    {"their_did": their_did, "my_did": my_did, "my_verkey": my_verkey},812        # )813        connection = None814        if their_did:815            try:816                connection = await ConnRecord.retrieve_by_did(817                    self._session, their_did, my_did818                )819            except StorageNotFoundError:820                pass821        if (822                connection823                and ConnRecord.State.get(connection.state) is ConnRecord.State.RESPONSE824                and auto_complete825        ):826            connection.state = ConnRecord.State.COMPLETED.rfc160827            await connection.save(self._session, reason="Connection promoted to active")828        if not connection and my_verkey:829            try:830                connection = await ConnRecord.retrieve_by_invitation_key(831                    self._session,832                    my_verkey,833                    their_role=ConnRecord.Role.REQUESTER.rfc160,834                )835            except StorageError:836                pass837        return connection838    async def find_inbound_connection(self, receipt: MessageReceipt) -> ConnRecord:839        """840        Deserialize an incoming message and further populate the request context.841        Args:842            receipt: The message receipt843        Returns:844            The `ConnRecord` associated with the expanded message, if any845        """846        cache_key = None847        connection = None848        resolved = False849        if receipt.sender_verkey and receipt.recipient_verkey:850            cache_key = (851                f"connection_by_verkey::{receipt.sender_verkey}"852                f"::{receipt.recipient_verkey}"853            )854            cache = self._session.inject(BaseCache, required=False)855            if cache:856                async with cache.acquire(cache_key) as entry:857                    if entry.result:858                        cached = entry.result859                        receipt.sender_did = cached["sender_did"]860                        receipt.recipient_did_public = cached["recipient_did_public"]861                        receipt.recipient_did = cached["recipient_did"]862                        try:863                            connection = await ConnRecord.retrieve_by_id(864                                self._session, cached["id"]865                            )866                        except:867                            connection = await self.resolve_inbound_connection(receipt)868                            if connection:869                                cache.release(cache_key)870                            resolved = True871                    else:872                        connection = await self.resolve_inbound_connection(receipt)873                        if connection:874                            cache_val = {875                                "id": connection.connection_id,876                                "sender_did": receipt.sender_did,877                                "recipient_did": receipt.recipient_did,878                                "recipient_did_public": receipt.recipient_did_public,879                            }880                            await entry.set_result(cache_val, 3600)881                        resolved = True882        if not connection and not resolved:883            connection = await self.resolve_inbound_connection(receipt)884        return connection885    async def resolve_inbound_connection(self, receipt: MessageReceipt) -> ConnRecord:886        """887        Populate the receipt DID information and find the related `ConnRecord`.888        Args:889            receipt: The message receipt890        Returns:891            The `ConnRecord` associated with the expanded message, if any892        """893        if receipt.sender_verkey:894            try:895                receipt.sender_did = await self.find_did_for_key(receipt.sender_verkey)896            except StorageNotFoundError:897                self._logger.warning(898                    "No corresponding DID found for sender verkey: %s",899                    receipt.sender_verkey,900                )901        if receipt.recipient_verkey:902            try:903                wallet = self._session.inject(BaseWallet)904                my_info = await wallet.get_local_did_for_verkey(905                    receipt.recipient_verkey906                )907                receipt.recipient_did = my_info.did908                if "public" in my_info.metadata and my_info.metadata["public"] is True:909                    receipt.recipient_did_public = True910            except InjectionError:911                self._logger.warning(912                    "Cannot resolve recipient verkey, no wallet defined by "913                    "context: %s",914                    receipt.recipient_verkey,915                )916            except WalletNotFoundError:917                self._logger.warning(918                    "No corresponding DID found for recipient verkey: %s",919                    receipt.recipient_verkey,920                )921        return await self.find_connection(922            receipt.sender_did, receipt.recipient_did, receipt.recipient_verkey, True923        )924    async def get_connection_targets(925            self, *, connection_id: str = None, connection: ConnRecord = None926    ):927        """Create a connection target from a `ConnRecord`.928        Args:929            connection_id: The connection ID to search for930            connection: The connection record itself, if already available931        """932        if not connection_id:933            connection_id = connection.connection_id934        cache = self._session.inject(BaseCache, required=False)935        cache_key = f"connection_target::{connection_id}"936        if cache:937            async with cache.acquire(cache_key) as entry:938                if entry.result:939                    targets = [940                        ConnectionTarget.deserialize(row) for row in entry.result941                    ]942                else:943                    if not connection:944                        connection = await ConnRecord.retrieve_by_id(945                            self._session, connection_id946                        )947                    targets = await self.fetch_connection_targets(connection)948                    await entry.set_result([row.serialize() for row in targets], 3600)949        else:950            targets = await self.fetch_connection_targets(connection)951        return targets952    async def establish_inbound(953            self,954            connection: ConnRecord,955            inbound_connection_id: str,956            outbound_handler: Coroutine,957    ) -> str:958        """Assign the inbound routing connection for a connection record.959        Returns: the current routing state (request or done)960        """961        # The connection must have a verkey, but in the case of a received962        # invitation we might not have created one yet963        wallet = self._session.inject(BaseWallet)964        if connection.my_did:965            my_info = await wallet.get_local_did(connection.my_did)966        else:967            # Create new DID for connection968            my_info = await wallet.create_local_did()969            connection.my_did = my_info.did970        try:971            router = await ConnRecord.retrieve_by_id(972                self._session, inbound_connection_id973            )974        except StorageNotFoundError:975            raise ConnectionManagerError(976                f"Routing connection not found: {inbound_connection_id}"977            )978        if not router.is_ready:979            raise ConnectionManagerError(980                f"Routing connection is not ready: {inbound_connection_id}"981            )982        connection.inbound_connection_id = inbound_connection_id983        route_mgr = RoutingManager(self._session.profile)984        await route_mgr.send_create_route(985            inbound_connection_id, my_info.verkey, outbound_handler986        )987        connection.routing_state = ConnRecord.ROUTING_STATE_REQUEST988        await connection.save(self._session)989        return connection.routing_state990    async def update_inbound(991            self, inbound_connection_id: str, recip_verkey: str, routing_state: str992    ):993        """Activate connections once a route has been established.994        Looks up pending connections associated with the inbound routing995        connection and marks the routing as complete.996        """997        conns = await ConnRecord.query(998            self._session, {"inbound_connection_id": inbound_connection_id}999        )1000        wallet = self._session.inject(BaseWallet)1001        for connection in conns:1002            # check the recipient key1003            if not connection.my_did:1004                continue1005            conn_info = await wallet.get_local_did(connection.my_did)1006            if conn_info.verkey == recip_verkey:1007                connection.routing_state = routing_state...session.py
Source:session.py  
...109                            remote_ua_version = user_agent_ua_major110            return remote_hw_family, remote_hw_model, remote_os_family, remote_os_version, remote_ua_family, remote_ua_version111        except:112            return '', '', '', '', user_agent, ''113    def _new_session(self):114        from ..models import Session115        address = self.request.META.get('REMOTE_ADDR', '')116        geoname = self.request.META.get(self._geoname_header_name, '')117        user_agent = self.request.META.get('HTTP_USER_AGENT', '')118        remote_hw_family, remote_hw_model, remote_os_family, remote_os_version, remote_ua_family, remote_ua_version = self._parse_user_agent(user_agent)119        self._session = Session(120            previous_session=self._session,121            remote_address=address,122            remote_geoname=geoname,123            remote_hw_family=remote_hw_family,124            remote_hw_model=remote_hw_model,125            remote_os_family=remote_os_family,126            remote_os_version=remote_os_version,127            remote_ua_family=remote_ua_family,128            remote_ua_version=remote_ua_version)129    def _get_session(self, now, uuid):130        from ..models import Session131        address = self.request.META.get('REMOTE_ADDR', '')132        geoname = self.request.META.get(self._geoname_header_name, '')133        user_agent = self.request.META.get('HTTP_USER_AGENT', '')134        remote_hw_family, remote_hw_model, remote_os_family, remote_os_version, remote_ua_family, remote_ua_version = self._parse_user_agent(user_agent)135        try:136            self._session = Session.objects.get(137                uuid=uuid,138                valid_from__lte=now,139                valid_till__gte=now,140                remote_geoname=geoname,141                remote_hw_family=remote_hw_family,142                remote_os_family=remote_os_family,143                remote_ua_family=remote_ua_family)144            self._session.remote_hw_model = remote_hw_model145            self._session.remote_os_version = remote_os_version146            self._session.remote_ua_version = remote_ua_version147        except Session.DoesNotExist:148            self._session = Session(149                previous_session=self._session,150                remote_address=address,151                remote_geoname=geoname,152                remote_hw_family=remote_hw_family,153                remote_hw_model=remote_hw_model,154                remote_os_family=remote_os_family,155                remote_os_version=remote_os_version,156                remote_ua_family=remote_ua_family,157                remote_ua_version=remote_ua_version)158    def __init__(self, request):159        from collections import OrderedDict160        from django.conf import settings161        from ..models import Principal162        self._session = None163        self._variables = OrderedDict()164        self._geoname_header_name = getattr(settings, 'TALOS_GEONAME_HEADER', '')165        self.request = request166        self.request.session = self167        self.principal = Principal.objects.get(id=0)168        self.principal._load_authentication_context([])169    def init(self):170        self._new_session()171        self.request.principal = self.principal172    def load(self, uuid):173        from ..models import _tznow174        from ..models import Principal175        from json import loads176        now = _tznow()177        self._get_session(now, uuid)178        self._variables = loads(self._session.variables, cls=CustomJSONDecoder) if self._session.variables else {}179        if self._session.principal:180            self.principal = self._session.principal181            self.principal._inject_authentication_context(182                self._session.evidences,183                self._session.roles,184                self._session.privileges,185                self._session.model_actions)186        authentication_period = (now - self._session.valid_from).total_seconds()187        valid_evidences = []188        for evidence in self.principal._evidences_effective.values():189            if authentication_period < evidence.expiration_period:190                valid_evidences.append(evidence)191        if len(valid_evidences) != len(self.principal._evidences_effective) or len(valid_evidences) == 0:192            if len(valid_evidences) == 0:193                self.principal = Principal.objects.get(id=0)194            self.principal._load_authentication_context(valid_evidences)195            self._session.valid_from = now196        self.request.principal = self.principal197    def save(self):198        from ..models import Principal199        from django.utils.functional import LazyObject200        from json import dumps201        prev_evidences = self._session.evidences202        prev_roles = self._session.roles203        prev_privileges = self._session.privileges204        prev_model_actions = self._session.model_actions205        prev_variables = self._session.variables206        prev_principal = self._session.principal207        self._session.variables = dumps(self._variables, cls=CustomJSONEncoder)208        if self.request.principal and self.request.principal.is_authenticated:209            if issubclass(type(self.request.principal), LazyObject):210                self.request.principal._setup()211                if type(self.request.principal._wrapped) == Principal:212                    self._session.principal = self.request.principal._wrapped213            elif type(self.request.principal) == Principal:214                self._session.principal = self.request.principal215            if self._session.principal:216                self._session.evidences, \217                    self._session.roles, \218                    self._session.privileges, \219                    self._session.model_actions = self._session.principal._extract_authentication_context()220        else:221            self._session.evidences = ''222            self._session.roles = ''223            self._session.privileges = ''224            self._session.model_actions = ''225        if (226            (self._session.evidences != prev_evidences) or227            (self._session.roles != prev_roles) or228            (self._session.privileges != prev_privileges) or229            (self._session.model_actions != prev_model_actions) or230            (self._session.variables != prev_variables) or231            (self._session.principal != prev_principal)):232            self._session.save()233    def get(self, key, default=None):234        return self._variables.get(key, default)235    def pop(self, key, default=None):236        if key in self._variables:237            return self._variables.pop(key, default)238        return None239    def setdefault(self, key, default=None):240        if key in self._variables:241            return self._variables[key]242        else:243            self._variables[key] = default244            return default245    def set_test_cookie(self):246        self[self.TEST_COOKIE_NAME] = self.TEST_COOKIE_VALUE247    def test_cookie_worked(self):248        return self.get(self.TEST_COOKIE_NAME) == self.TEST_COOKIE_VALUE249    def delete_test_cookie(self):250        del self[self.TEST_COOKIE_NAME]251    def update(self, other):252        self._variables.update(other)253    def has_key(self, key):254        return key in self.__store255    def keys(self):256        return self._variables.keys()257    def values(self):258        return self._variables.values()259    def items(self):260        return self._variables.items()261    def iterkeys(self):262        return self._variables.iterkeys()263    def itervalues(self):264        return self._variables.itervalues()265    def iteritems(self):266        return self._variables.iteritems()267    def clear(self):268        from collections import OrderedDict269        self._variables = OrderedDict()270    def is_empty(self):271        try:272            return len(self._variables) == 0273        except AttributeError:274            return True275    def flush(self):276        from ..models import _tznow277        from ..models import Principal278        if self._session:279            self._session.valid_till = _tznow()280            self._session.save()281            self._new_session()282            self.principal = Principal.objects.get(id=0)283            self.principal._load_authentication_context([])284            self.request.principal = self.principal285    def cycle_key(self):286        if self._session:287            from uuid import uuid4288            self._session.uuid = uuid4()289    def __contains__(self, key):290        return key in self._variables291    def __getitem__(self, key):292        return self._variables[key]293    def __setitem__(self, key, value):294        if key not in ('_auth_user_backend', '_auth_user_hash', '_auth_user_id'):295            self._variables[key] = value...test_overthere_session.py
Source:test_overthere_session.py  
1#2# Copyright 2020 XEBIALABS3#4# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:5#6# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.7#8# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.9#10from __future__ import with_statement11import xlnoserunner12import os13from overtherepy import LocalConnectionOptions, OverthereHost, OverthereHostSession, OverthereSessionLogger, Diff14from com.xebialabs.overthere import OperatingSystemFamily15from java.io import File16from java.lang import Thread17from nose.tools import ok_, eq_18class TestOverthereSession(object):19    # preparing to test20    def setUp(self):21        self._linuxhost = OverthereHost(LocalConnectionOptions(os=OperatingSystemFamily.UNIX))22        self._session = OverthereHostSession(self._linuxhost)23        self._session.logger = OverthereSessionLogger(capture=True)24    # ending the test25    def tearDown(self):26        self._session.close_conn()27    def _clear_logs(self):28        self._session.logger.output_lines = []29        self._session.logger.error_lines = []30    def assert_in_list(self, list, expect):31        r = [s for s in list if str(s) == str(expect)]32        eq_(len(r), 1, "expect [%s] to be in list [%s]" % (expect, str(list)))33    def test_not_windows_host(self):34        ok_(not self._session.is_windows())35    def test_work_dir_cleanup(self):36        workdir = self._session.work_dir()37        ok_(workdir.exists())38        workdir_path = workdir.path39        self._session.close_conn()40        ok_(not os.path.exists(workdir_path))41    def test_read_write_file(self):42        f = self._session.upload_text_content_to_work_dir("some text", "my.txt")43        text = self._session.read_file(f.path)44        eq_("some text", text)45    def test_upload_executable_file(self):46        f = self._session.upload_text_content_to_work_dir("#!/bin/sh\necho hello", "my.sh", executable=True)47        r = self._session.execute(f.path)48        self.assert_in_list(r.stdout, "hello")49    def test_upload_classpath_resource(self):50        f = self._session.upload_classpath_resource_to_work_dir("testfiles/singleline.txt")51        text = self._session.read_file(f.path)52        eq_("some text 1", text)53    def test_read_lines(self):54        f = self._session.upload_classpath_resource_to_work_dir("testfiles/multiline.txt")55        lines = self._session.read_file_lines(f.path)56        eq_(3, len(lines))57    def test_execute(self):58        f = self._session.upload_classpath_resource_to_work_dir("testfiles/echo.sh", executable=True)59        response = self._session.execute([f.path, "ping"])60        eq_(response['rc'], 0)61        eq_(response['stdout'][0], "Hi ping")62    def test_execute_automatic_system_exit_on_failure(self):63        success = False64        try:65            f = self._session.upload_classpath_resource_to_work_dir("testfiles/echo.sh", executable=True)66            self._session.execute([f.path, "ping", "1"])67        except SystemExit:68            success = True69            pass70        eq_(success, True)71    def test_execute_check_success_turned_off(self):72        f = self._session.upload_classpath_resource_to_work_dir("testfiles/echo.sh", executable=True)73        response = self._session.execute("%s ping 1" % f.path, check_success=False)74        eq_(response.rc, 1)75        eq_(response['stdout'][0], "Hi ping")76        eq_(response['stdout'][1], "Exiting with 1")77    def test_execute_check_success_turned_off_and_spaces_in_parameters(self):78        f = self._session.upload_classpath_resource_to_work_dir("testfiles/echo.sh", executable=True)79        response = self._session.execute("%s 'ping pong' 2" % f.path, check_success=False)80        eq_(response.rc, 2)81        eq_(response['stdout'][0], "Hi ping pong")82        eq_(response['stdout'][1], "Exiting with 2")83    def test_with_support(self):84        s = OverthereHostSession(self._linuxhost)85        with s:86            work_dir = s.work_dir().path87            eq_(os.path.exists(work_dir), True)88        eq_(os.path.exists(work_dir), False)89    def _local_file(self, resource):90        url = Thread.currentThread().contextClassLoader.getResource(resource)91        return self._session.local_file(File(url.toURI()))92    def test_mkdirs_on_dir_copy(self):93        target = self._session.work_dir_file("some/path")94        source = self._local_file("testfiles")95        eq_(os.path.exists(target.path), False)96        self._session.copy_to(source, target)97        eq_(os.path.exists(target.path), True)98        eq_(os.path.exists(target.path + "/echo.sh"), True)99        log_info = self._session.logger.output_lines100        eq_(len(log_info), 2)101        eq_(log_info[0], "Creating path %s" % target.path)102        eq_(log_info[1], "Copying %s to %s" % (source.path, target.path))103    def test_mkdirs_on_file_copy(self):104        target = self._session.work_dir_file("some/path/some.txt")105        source = self._local_file("testfiles/echo.sh")106        eq_(os.path.exists(target.path), False)107        self._session.copy_to(source, target)108        eq_(os.path.exists(target.path), True)109        log_info = self._session.logger.output_lines110        eq_(len(log_info), 2)111        eq_(log_info[0], "Creating path %s" % target.parentFile.path)112        eq_(log_info[1], "Copying %s to %s" % (source.path, target.path))113    def test_mkdirs_on_turned_off_on_copy(self):114        target = self._session.work_dir_file("some/path")115        source = self._local_file("testfiles")116        success = False117        try:118            self._session.copy_to(source, target, mkdirs=False)119        except:120            success = True121        eq_(success, True)122    def test_delete_from_when_target_does_not_exit(self):123        target = self._session.work_dir_file("some/path")124        source = None125        self._session.delete_from(source, target)126        log_info = self._session.logger.output_lines127        eq_(len(log_info), 1)128        eq_(log_info[0], "Target [%s] does not exist. No deletion to be performed" % target.path)129    def test_delete_from_file_target(self):130        target = self._session.work_dir_file("some/path/test.txt")131        self._session.copy_text_to_file("xxx", target)132        eq_(os.path.exists(target.path), True)133        source = None134        self._session.delete_from(source, target)135        eq_(os.path.exists(target.path), False)136        log_info = self._session.logger.output_lines137        eq_(len(log_info), 2)138        self.assert_in_list(log_info, "Deleting [%s]" % target.path)139    def test_delete_from_folder_target(self):140        target = self._session.work_dir_file("some/path")141        source = self._local_file("testfiles")142        self._session.copy_to(source, target)143        eq_(os.path.exists(target.path), True)144        self._clear_logs()145        self._session.delete_from(source, target)146        eq_(os.path.exists(target.path), False)147        log_info = self._session.logger.output_lines148        eq_(len(log_info), 5)149        self.assert_in_list(log_info, "Recursively deleting directory [%s/asubdir]" % target.path)150        self.assert_in_list(log_info, "Deleting file [%s/echo.sh]" % target.path)151        self.assert_in_list(log_info, "Deleting file [%s/multiline.txt]" % target.path)152        self.assert_in_list(log_info, "Deleting file [%s/singleline.txt]" % target.path)153        self.assert_in_list(log_info, "Deleting directory [%s]" % target.path)154    def test_delete_from_with_external_resources(self):155        target = self._session.work_dir_file("some/path")156        source = self._local_file("testfiles")157        self._session.copy_to(source, target)158        eq_(os.path.exists(target.path), True)159        external_resource = self._session.work_dir_file("some/path/ext.txt")160        self._session.copy_text_to_file("xxx", external_resource)161        eq_(os.path.exists(external_resource.path), True)162        self._clear_logs()163        self._session.delete_from(source, target)164        eq_(os.path.exists(target.path), True)165        log_info = self._session.logger.output_lines166        eq_(len(log_info), 5)167        self.assert_in_list(log_info, "Recursively deleting directory [%s/asubdir]" % target.path)168        self.assert_in_list(log_info, "Deleting file [%s/echo.sh]" % target.path)169        self.assert_in_list(log_info, "Deleting file [%s/multiline.txt]" % target.path)170        self.assert_in_list(log_info, "Deleting file [%s/singleline.txt]" % target.path)171        self.assert_in_list(log_info, "Target directory [%s] is not shared, but still has content from an external source. Will not delete" % target.path)172    def test_copy_diff(self):173        old = self._local_file("directorycompare/old")174        new = self._local_file("directorycompare/new")175        deployed_old = self._session.work_dir_file("olddeployed")176        self._session.copy_to(old, deployed_old)177        eq_(os.path.exists(deployed_old.path), True)178        self._clear_logs()179        diff = Diff.calculate_diff(old, new)180        self._session.copy_diff(deployed_old.path, diff)181        log_info = self._session.logger.output_lines182        eq_(len(log_info), 17)183        self.assert_in_list(log_info, "3 files to be removed.")184        self.assert_in_list(log_info, "3 new files to be copied.")185        self.assert_in_list(log_info, "2 modified files to be copied.")186        self.assert_in_list(log_info, "Start removal of files...")187        self.assert_in_list(log_info, "Removing file %s/removefile.txt" % deployed_old.path)188        self.assert_in_list(log_info, "Removing directory %s/subdirremove" % deployed_old.path)189        self.assert_in_list(log_info, "Removing file %s/subdirboth/removefile.txt" % deployed_old.path)190        self.assert_in_list(log_info, "Removal of files done.")191        self.assert_in_list(log_info, "Start copying of new files...")192        self.assert_in_list(log_info, "Copying directory %s/subdirnew" % deployed_old.path)193        self.assert_in_list(log_info, "Copying file %s/newfile.txt" % deployed_old.path)194        self.assert_in_list(log_info, "Copying file %s/subdirboth/newfile.txt" % deployed_old.path)195        self.assert_in_list(log_info, "Copying of new files done.")196        self.assert_in_list(log_info, "Start copying of modified files...")197        self.assert_in_list(log_info, "Updating file %s/changedfile.txt" % deployed_old.path)198        self.assert_in_list(log_info, "Updating file %s/subdirboth/changedfile.txt" % deployed_old.path)199        self.assert_in_list(log_info, "Copying of modified files done.")200    def test_execution_ctx_logging(self):201        class ExecutionContext(object):202            def __init__(self):203                self.output_success = False204                self.error_success = False205            def logOutput(self, msg):206                self.output_success = True207            def logError(self, msg):208                self.error_success = True209        exec_ctx = ExecutionContext()210        session = OverthereHostSession(self._linuxhost, execution_context=exec_ctx)211        session.logger.info("Check")212        eq_(exec_ctx.output_success, True)213        session.logger.error("Check")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
