Best Python code snippet using localstack_python
test_robot_command.py
Source:test_robot_command.py  
...83    assert isinstance(command, robot_command_pb2.RobotCommand)84    assert command.HasField("full_body_command")85    assert not command.HasField("mobility_command")86    assert not command.HasField("synchronized_command")87def _test_has_synchronized(command):88    assert isinstance(command, robot_command_pb2.RobotCommand)89    assert command.HasField("synchronized_command")90    assert not command.HasField("full_body_command")91    assert not command.HasField("mobility_command")92def _test_has_mobility(command):93    assert isinstance(command, synchronized_command_pb2.SynchronizedCommand.Request)94    assert command.HasField("mobility_command")95def _test_has_mobility_deprecated(command):96    assert isinstance(command, robot_command_pb2.RobotCommand)97    assert command.HasField("mobility_command")98def _test_has_arm(command):99    assert isinstance(command, synchronized_command_pb2.SynchronizedCommand.Request)100    assert command.HasField("arm_command")101def _test_has_gripper(command):102    assert isinstance(command, synchronized_command_pb2.SynchronizedCommand.Request)103    assert command.HasField("gripper_command")104def test_stop_command():105    command = RobotCommandBuilder.stop_command()106    _test_has_full_body(command)107    assert command.full_body_command.HasField("stop_request")108def test_freeze_command():109    command = RobotCommandBuilder.freeze_command()110    _test_has_full_body(command)111    assert command.full_body_command.HasField("freeze_request")112def test_selfright_command():113    command = RobotCommandBuilder.selfright_command()114    _test_has_full_body(command)115    assert command.full_body_command.HasField("selfright_request")116def test_safe_power_off_command():117    command = RobotCommandBuilder.safe_power_off_command()118    _test_has_full_body(command)119    assert command.full_body_command.HasField("safe_power_off_request")120def test_trajectory_command():121    goal_x = 1.0122    goal_y = 2.0123    goal_heading = 3.0124    frame = ODOM_FRAME_NAME125    command = RobotCommandBuilder.trajectory_command(goal_x, goal_y, goal_heading, frame)126    _test_has_mobility_deprecated(command)127    assert command.mobility_command.HasField("se2_trajectory_request")128    traj = command.mobility_command.se2_trajectory_request.trajectory129    assert len(traj.points) == 1130    assert traj.points[0].pose.position.x == goal_x131    assert traj.points[0].pose.position.y == goal_y132    assert traj.points[0].pose.angle == goal_heading133    assert command.mobility_command.se2_trajectory_request.se2_frame_name == ODOM_FRAME_NAME134def test_velocity_command():135    v_x = 1.0136    v_y = 2.0137    v_rot = 3.0138    command = RobotCommandBuilder.velocity_command(v_x, v_y, v_rot)139    _test_has_mobility_deprecated(command)140    assert command.mobility_command.HasField("se2_velocity_request")141    vel_cmd = command.mobility_command.se2_velocity_request142    assert vel_cmd.velocity.linear.x == v_x143    assert vel_cmd.velocity.linear.y == v_y144    assert vel_cmd.velocity.angular == v_rot145    assert vel_cmd.se2_frame_name == BODY_FRAME_NAME146def test_stand_command():147    command = RobotCommandBuilder.stand_command()148    _test_has_mobility_deprecated(command)149    assert command.mobility_command.HasField("stand_request")150def test_sit_command():151    command = RobotCommandBuilder.sit_command()152    _test_has_mobility_deprecated(command)153    assert command.mobility_command.HasField("sit_request")154def _check_se2_traj_command(command, goal_x, goal_y, goal_heading, frame_name, n_points):155    _test_has_synchronized(command)156    _test_has_mobility(command.synchronized_command)157    assert command.synchronized_command.mobility_command.HasField("se2_trajectory_request")158    request = command.synchronized_command.mobility_command.se2_trajectory_request159    assert len(request.trajectory.points) == n_points160    assert request.trajectory.points[0].pose.position.x == goal_x161    assert request.trajectory.points[0].pose.position.y == goal_y162    assert request.trajectory.points[0].pose.angle == goal_heading163    assert request.se2_frame_name == frame_name164def test_synchro_se2_trajectory_point_command():165    goal_x = 1.0166    goal_y = 2.0167    goal_heading = 3.0168    frame = ODOM_FRAME_NAME169    command = RobotCommandBuilder.synchro_se2_trajectory_point_command(170        goal_x, goal_y, goal_heading, frame)171    _check_se2_traj_command(command, goal_x, goal_y, goal_heading, frame, 1)172    # with a build_on_command173    arm_command = RobotCommandBuilder.arm_stow_command()174    command = RobotCommandBuilder.synchro_se2_trajectory_point_command(175        goal_x, goal_y, goal_heading, frame, build_on_command=arm_command)176    _check_se2_traj_command(command, goal_x, goal_y, goal_heading, frame, 1)177    _test_has_arm(command.synchronized_command)178def test_synchro_se2_trajectory_command():179    goal_x = 1.0180    goal_y = 2.0181    goal_heading = 3.0182    frame = ODOM_FRAME_NAME183    position = geometry_pb2.Vec2(x=goal_x, y=goal_y)184    goal_se2 = geometry_pb2.SE2Pose(position=position, angle=goal_heading)185    command = RobotCommandBuilder.synchro_se2_trajectory_command(goal_se2, frame)186    _check_se2_traj_command(command, goal_x, goal_y, goal_heading, frame, 1)187    # with a build_on_command188    arm_command = RobotCommandBuilder.arm_stow_command()189    command = RobotCommandBuilder.synchro_se2_trajectory_command(goal_se2, frame,190                                                                 build_on_command=arm_command)191    _check_se2_traj_command(command, goal_x, goal_y, goal_heading, frame, 1)192    _test_has_arm(command.synchronized_command)193def test_synchro_velocity_command():194    v_x = 1.0195    v_y = 2.0196    v_rot = 3.0197    command = RobotCommandBuilder.synchro_velocity_command(v_x, v_y, v_rot)198    _test_has_synchronized(command)199    _test_has_mobility(command.synchronized_command)200    assert command.synchronized_command.mobility_command.HasField("se2_velocity_request")201    vel_cmd = command.synchronized_command.mobility_command.se2_velocity_request202    assert vel_cmd.velocity.linear.x == v_x203    assert vel_cmd.velocity.linear.y == v_y204    assert vel_cmd.velocity.angular == v_rot205    assert vel_cmd.se2_frame_name == BODY_FRAME_NAME206    # with a build_on_command207    arm_command = RobotCommandBuilder.arm_stow_command()208    command = RobotCommandBuilder.synchro_velocity_command(v_x, v_y, v_rot,209                                                           build_on_command=arm_command)210    _test_has_synchronized(command)211    _test_has_mobility(command.synchronized_command)212    _test_has_arm(command.synchronized_command)213def test_synchro_stand_command():214    command = RobotCommandBuilder.synchro_stand_command()215    _test_has_synchronized(command)216    _test_has_mobility(command.synchronized_command)217    assert command.synchronized_command.mobility_command.HasField("stand_request")218    # basic check with a build_on_command219    arm_command = RobotCommandBuilder.arm_stow_command()220    command = RobotCommandBuilder.synchro_stand_command(build_on_command=arm_command)221    _test_has_synchronized(command)222    _test_has_mobility(command.synchronized_command)223    _test_has_arm(command.synchronized_command)224def test_synchro_sit_command():225    command = RobotCommandBuilder.synchro_sit_command()226    _test_has_synchronized(command)227    _test_has_mobility(command.synchronized_command)228    assert command.synchronized_command.mobility_command.HasField("sit_request")229    # with a build_on_command230    arm_command = RobotCommandBuilder.arm_stow_command()231    command = RobotCommandBuilder.synchro_sit_command(build_on_command=arm_command)232    _test_has_synchronized(command)233    _test_has_mobility(command.synchronized_command)234    _test_has_arm(command.synchronized_command)235def test_arm_stow_command():236    command = RobotCommandBuilder.arm_stow_command()237    _test_has_synchronized(command)238    _test_has_arm(command.synchronized_command)239    assert (command.synchronized_command.arm_command.WhichOneof("command") ==240            'named_arm_position_command')241    # with a build_on_command242    mobility_command = RobotCommandBuilder.synchro_sit_command()243    command = RobotCommandBuilder.arm_stow_command(build_on_command=mobility_command)244    _test_has_synchronized(command)245    _test_has_mobility(command.synchronized_command)246    _test_has_arm(command.synchronized_command)247def test_arm_ready_command():248    command = RobotCommandBuilder.arm_ready_command()249    _test_has_synchronized(command)250    _test_has_arm(command.synchronized_command)251    assert (command.synchronized_command.arm_command.WhichOneof("command") ==252            'named_arm_position_command')253    # with a build_on_command254    mobility_command = RobotCommandBuilder.synchro_sit_command()255    command = RobotCommandBuilder.arm_ready_command(build_on_command=mobility_command)256    _test_has_synchronized(command)257    _test_has_mobility(command.synchronized_command)258    _test_has_arm(command.synchronized_command)259def test_arm_carry_command():260    command = RobotCommandBuilder.arm_ready_command()261    _test_has_synchronized(command)262    _test_has_arm(command.synchronized_command)263    assert (command.synchronized_command.arm_command.WhichOneof("command") ==264            'named_arm_position_command')265    # with a build_on_command266    mobility_command = RobotCommandBuilder.synchro_sit_command()267    command = RobotCommandBuilder.arm_carry_command(build_on_command=mobility_command)268    _test_has_synchronized(command)269    _test_has_mobility(command.synchronized_command)270    _test_has_arm(command.synchronized_command)271def test_arm_pose_command():272    x = 0.75273    y = 0274    z = 0.25275    qw = 1276    qx = 0277    qy = 0278    qz = 0279    command = RobotCommandBuilder.arm_pose_command(x, y, z, qw, qx, qy, qz, BODY_FRAME_NAME)280    _test_has_synchronized(command)281    _test_has_arm(command.synchronized_command)282    assert command.synchronized_command.arm_command.HasField("arm_cartesian_command")283    arm_cartesian_command = command.synchronized_command.arm_command.arm_cartesian_command284    assert arm_cartesian_command.root_frame_name == BODY_FRAME_NAME285    assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.position.x == x286    assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.position.y == y287    assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.position.z == z288    assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.rotation.x == qx289    assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.rotation.y == qy290    assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.rotation.z == qz291    assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.rotation.w == qw292    # with a build_on_command293    mobility_command = RobotCommandBuilder.synchro_sit_command()294    command = RobotCommandBuilder.arm_pose_command(x, y, z, qw, qx, qy, qz, BODY_FRAME_NAME,295                                                   build_on_command=mobility_command)296    _test_has_synchronized(command)297    _test_has_mobility(command.synchronized_command)298    _test_has_arm(command.synchronized_command)299def test_claw_gripper_open_command():300    command = RobotCommandBuilder.claw_gripper_open_command()301    _test_has_synchronized(command)302    _test_has_gripper(command.synchronized_command)303    assert (command.synchronized_command.gripper_command.WhichOneof("command") ==304            "claw_gripper_command")305    # with a build_on_command306    mobility_command = RobotCommandBuilder.synchro_sit_command()307    command = RobotCommandBuilder.claw_gripper_open_command(build_on_command=mobility_command)308    _test_has_synchronized(command)309    _test_has_mobility(command.synchronized_command)310    _test_has_gripper(command.synchronized_command)311def test_claw_gripper_close_command():312    command = RobotCommandBuilder.claw_gripper_close_command()313    _test_has_synchronized(command)314    _test_has_gripper(command.synchronized_command)315    assert (command.synchronized_command.gripper_command.WhichOneof("command") ==316            "claw_gripper_command")317    # with a build_on_command318    mobility_command = RobotCommandBuilder.synchro_sit_command()319    command = RobotCommandBuilder.claw_gripper_close_command(build_on_command=mobility_command)320    _test_has_synchronized(command)321    _test_has_mobility(command.synchronized_command)322    _test_has_gripper(command.synchronized_command)323def test_build_synchro_command():324    # two synchro subcommands of the same type:325    arm_command_1 = RobotCommandBuilder.arm_ready_command()326    arm_command_2 = RobotCommandBuilder.arm_stow_command()327    command = RobotCommandBuilder.build_synchro_command(arm_command_1, arm_command_2)328    _test_has_synchronized(command)329    _test_has_arm(command.synchronized_command)330    assert not command.synchronized_command.HasField("gripper_command")331    assert not command.synchronized_command.HasField("mobility_command")332    command_position = command.synchronized_command.arm_command.named_arm_position_command.position333    assert command_position == arm_command_pb2.NamedArmPositionsCommand.POSITIONS_STOW334    # two synchro subcommands of a different type:335    arm_command = RobotCommandBuilder.arm_ready_command()336    mobility_command = RobotCommandBuilder.synchro_stand_command()337    command = RobotCommandBuilder.build_synchro_command(arm_command, mobility_command)338    _test_has_synchronized(command)339    _test_has_mobility(command.synchronized_command)340    _test_has_arm(command.synchronized_command)341    assert not command.synchronized_command.HasField("gripper_command")342    assert command.synchronized_command.mobility_command.HasField("stand_request")343    command_position = command.synchronized_command.arm_command.named_arm_position_command.position344    assert command_position == arm_command_pb2.NamedArmPositionsCommand.POSITIONS_READY345    # one synchro command and one deprecated mobility command:346    deprecated_mobility_command = RobotCommandBuilder.sit_command()347    command = RobotCommandBuilder.build_synchro_command(mobility_command,348                                                        deprecated_mobility_command)349    _test_has_synchronized(command)350    _test_has_mobility(command.synchronized_command)351    assert command.synchronized_command.mobility_command.HasField("sit_request")352    # fullbody command is rejected353    full_body_command = RobotCommandBuilder.selfright_command()354    with pytest.raises(Exception):355        command = RobotCommandBuilder.build_synchro_command(arm_command, full_body_command)356def test_edit_timestamps():357    def _set_new_time(key, proto):358        """If proto has a field named key, fill set it to end_time_secs as robot time. """359        if key not in proto.DESCRIPTOR.fields_by_name:360            return  # No such field in the proto to be set to the end-time.361        end_time = getattr(proto, key)362        end_time.CopyFrom(timestamp_pb2.Timestamp(seconds=10))363    command = robot_command_pb2.RobotCommand()...state.py
Source:state.py  
1import logging2import threading3from collections import defaultdict, namedtuple4logger = logging.getLogger(__name__)5Offsets = namedtuple("Offsets", "local remote")6class InvalidState(Exception):7    pass8class InvalidStateTransition(Exception):9    pass10class MessageNotReady(Exception):11    pass12class SynchronizedPartitionState:13    # The ``SYNCHRONIZED`` state represents that the local offset is equal to14    # the remote offset. The local consumer should be paused to avoid advancing15    # further beyond the remote consumer.16    SYNCHRONIZED = "SYNCHRONIZED"17    # The ``LOCAL_BEHIND`` state represents that the remote offset is greater18    # than the local offset. The local consumer should be unpaused to avoid19    # falling behind the remote consumer.20    LOCAL_BEHIND = "LOCAL_BEHIND"21    # The ``REMOTE_BEHIND`` state represents that the local offset is greater22    # than the remote offset. The local consumer should be paused to avoid23    # advancing further beyond the remote consumer.24    REMOTE_BEHIND = "REMOTE_BEHIND"25    # The ``UNKNOWN`` state represents that we haven't received enough data to26    # know the current offset state.27    UNKNOWN = "UNKNOWN"28class SynchronizedPartitionStateManager(object):29    """30    This class implements a state machine that can be used to track the31    consumption progress of a Kafka partition (the "local" consumer) relative32    to a the progress of another consumer (the "remote" consumer.)33    This is intended to be paired with the ``SynchronizedConsumer``.34    """35    transitions = {  # from state -> set(to states)36        None: frozenset([SynchronizedPartitionState.UNKNOWN]),37        SynchronizedPartitionState.UNKNOWN: frozenset(38            [39                SynchronizedPartitionState.LOCAL_BEHIND,40                SynchronizedPartitionState.REMOTE_BEHIND,41                SynchronizedPartitionState.SYNCHRONIZED,42            ]43        ),44        SynchronizedPartitionState.REMOTE_BEHIND: frozenset(45            [SynchronizedPartitionState.LOCAL_BEHIND, SynchronizedPartitionState.SYNCHRONIZED]46        ),47        SynchronizedPartitionState.LOCAL_BEHIND: frozenset(48            [SynchronizedPartitionState.SYNCHRONIZED, SynchronizedPartitionState.REMOTE_BEHIND]49        ),50        SynchronizedPartitionState.SYNCHRONIZED: frozenset(51            [SynchronizedPartitionState.LOCAL_BEHIND, SynchronizedPartitionState.REMOTE_BEHIND]52        ),53    }54    def __init__(self, callback):55        self.partitions = defaultdict(lambda: (None, Offsets(None, None)))56        self.callback = callback57        self.__lock = threading.RLock()58    def get_state_from_offsets(self, offsets):59        """60        Derive the partition state by comparing local and remote offsets.61        """62        if offsets.local is None or offsets.remote is None:63            return SynchronizedPartitionState.UNKNOWN64        else:65            if offsets.local < offsets.remote:66                return SynchronizedPartitionState.LOCAL_BEHIND67            elif offsets.remote < offsets.local:68                return SynchronizedPartitionState.REMOTE_BEHIND69            else:  # local == remote70                return SynchronizedPartitionState.SYNCHRONIZED71    def set_local_offset(self, topic, partition, local_offset):72        """73        Update the local offset for a topic and partition.74        If this update operation results in a state change, the callback75        function will be invoked.76        """77        with self.__lock:78            previous_state, previous_offsets = self.partitions[(topic, partition)]79            if previous_offsets.local is not None and (80                local_offset is None or local_offset < previous_offsets.local81            ):82                logger.info(83                    "Local offset for %s/%s has moved backwards (current: %s, previous: %s)",84                    topic,85                    partition,86                    local_offset,87                    previous_offsets.local,88                )89            updated_offsets = Offsets(local_offset, previous_offsets.remote)90            updated_state = self.get_state_from_offsets(updated_offsets)91            if (92                previous_state is not updated_state93                and updated_state not in self.transitions[previous_state]94            ):95                raise InvalidStateTransition(96                    "Unexpected state transition for {}/{} from {} to {}".format(97                        topic, partition, previous_state, updated_state98                    )99                )100            self.partitions[(topic, partition)] = (updated_state, updated_offsets)101            if previous_state is not updated_state:102                if updated_state == SynchronizedPartitionState.REMOTE_BEHIND:103                    logger.warning(104                        "Current local offset for %s/%s (%s) exceeds remote offset (%s)!",105                        topic,106                        partition,107                        updated_offsets.local,108                        updated_offsets.remote,109                    )110                self.callback(111                    topic,112                    partition,113                    (previous_state, previous_offsets),114                    (updated_state, updated_offsets),115                )116    def set_remote_offset(self, topic, partition, remote_offset):117        """118        Update the remote offset for a topic and partition.119        If this update operation results in a state change, the callback120        function will be invoked.121        """122        with self.__lock:123            previous_state, previous_offsets = self.partitions[(topic, partition)]124            if previous_offsets.remote is not None and (125                remote_offset is None or remote_offset < previous_offsets.remote126            ):127                logger.info(128                    "Remote offset for %s/%s has moved backwards (current: %s, previous: %s)",129                    topic,130                    partition,131                    remote_offset,132                    previous_offsets.remote,133                )134            updated_offsets = Offsets(previous_offsets.local, remote_offset)135            updated_state = self.get_state_from_offsets(updated_offsets)136            if (137                previous_state is not updated_state138                and updated_state not in self.transitions[previous_state]139            ):140                raise InvalidStateTransition(141                    "Unexpected state transition for {}/{} from {} to {}".format(142                        topic, partition, previous_state, updated_state143                    )144                )145            self.partitions[(topic, partition)] = (updated_state, updated_offsets)146            if previous_state is not updated_state:147                self.callback(148                    topic,149                    partition,150                    (previous_state, previous_offsets),151                    (updated_state, updated_offsets),152                )153    def validate_local_message(self, topic, partition, offset):154        """155        Check if a message should be consumed by the local consumer.156        The local consumer should be prevented from consuming messages that157        have yet to have been committed by the remote consumer.158        """159        with self.__lock:160            state, offsets = self.partitions[(topic, partition)]161            if state is not SynchronizedPartitionState.LOCAL_BEHIND:162                raise InvalidState(163                    "Received a message while consumer is not in LOCAL_BEHIND state!"164                )165            if offset >= offsets.remote:166                raise MessageNotReady(167                    "Received a message that has not been committed by remote consumer"168                )169            if offset < offsets.local:170                logger.warning(171                    "Received a message prior to local offset (local consumer offset rewound without update?)"...test_synchronized_lock.py
Source:test_synchronized_lock.py  
1from __future__ import print_function2import unittest3import wrapt4@wrapt.synchronized5def function():6    print('function')7class C1(object):8    @wrapt.synchronized9    def function1(self):10        print('function1')11    @wrapt.synchronized12    @classmethod13    def function2(cls):14        print('function2')15    @wrapt.synchronized16    @staticmethod17    def function3():18        print('function3')19c1 = C1()20@wrapt.synchronized21class C2(object):22    pass23@wrapt.synchronized24class C3:25    pass26class C4(object):27    # XXX This yields undesirable results due to how class method is28    # implemented. The classmethod doesn't bind the method to the class29    # before calling. As a consequence, the decorator wrapper function30    # sees the instance as None with the class being explicitly passed31    # as the first argument. It isn't possible to detect and correct32    # this.33    @classmethod34    @wrapt.synchronized35    def function2(cls):36        print('function2')37    @staticmethod38    @wrapt.synchronized39    def function3():40        print('function3')41c4 = C4()42class TestSynchronized(unittest.TestCase):43    def test_synchronized_function(self):44        _lock0 = getattr(function, '_synchronized_lock', None)45        self.assertEqual(_lock0, None)46        function()47        _lock1 = getattr(function, '_synchronized_lock', None)48        self.assertNotEqual(_lock1, None)49        function()50        _lock2 = getattr(function, '_synchronized_lock', None)51        self.assertNotEqual(_lock2, None)52        self.assertEqual(_lock2, _lock1)53        function()54        _lock3 = getattr(function, '_synchronized_lock', None)55        self.assertNotEqual(_lock3, None)56        self.assertEqual(_lock3, _lock2)57    def test_synchronized_inner_staticmethod(self):58        _lock0 = getattr(C1.function3, '_synchronized_lock', None)59        self.assertEqual(_lock0, None)60        c1.function3()61        _lock1 = getattr(C1.function3, '_synchronized_lock', None)62        self.assertNotEqual(_lock1, None)63        C1.function3()64        _lock2 = getattr(C1.function3, '_synchronized_lock', None)65        self.assertNotEqual(_lock2, None)66        self.assertEqual(_lock2, _lock1)67        C1.function3()68        _lock3 = getattr(C1.function3, '_synchronized_lock', None)69        self.assertNotEqual(_lock3, None)70        self.assertEqual(_lock3, _lock2)71    def test_synchronized_outer_staticmethod(self):72        _lock0 = getattr(C4.function3, '_synchronized_lock', None)73        self.assertEqual(_lock0, None)74        c4.function3()75        _lock1 = getattr(C4.function3, '_synchronized_lock', None)76        self.assertNotEqual(_lock1, None)77        C4.function3()78        _lock2 = getattr(C4.function3, '_synchronized_lock', None)79        self.assertNotEqual(_lock2, None)80        self.assertEqual(_lock2, _lock1)81        C4.function3()82        _lock3 = getattr(C4.function3, '_synchronized_lock', None)83        self.assertNotEqual(_lock3, None)84        self.assertEqual(_lock3, _lock2)85    def test_synchronized_inner_classmethod(self):86        if hasattr(C1, '_synchronized_lock'):87            del C1._synchronized_lock88        _lock0 = getattr(C1, '_synchronized_lock', None)89        self.assertEqual(_lock0, None)90        c1.function2()91        _lock1 = getattr(C1, '_synchronized_lock', None)92        self.assertNotEqual(_lock1, None)93        C1.function2()94        _lock2 = getattr(C1, '_synchronized_lock', None)95        self.assertNotEqual(_lock2, None)96        self.assertEqual(_lock2, _lock1)97        C1.function2()98        _lock3 = getattr(C1, '_synchronized_lock', None)99        self.assertNotEqual(_lock3, None)100        self.assertEqual(_lock3, _lock2)101    def test_synchronized_outer_classmethod(self):102        # XXX If all was good, this would be detected as a class103        # method call, but the classmethod decorator doesn't bind104        # the wrapped function to the class before calling and105        # just calls it direct, explicitly passing the class as106        # first argument. This screws things up. Would be nice if107        # Python were fixed, but that isn't likely to happen.108        #_lock0 = getattr(C4, '_synchronized_lock', None)109        _lock0 = getattr(C4.function2, '_synchronized_lock', None)110        self.assertEqual(_lock0, None)111        c4.function2()112        #_lock1 = getattr(C4, '_synchronized_lock', None)113        _lock1 = getattr(C4.function2, '_synchronized_lock', None)114        self.assertNotEqual(_lock1, None)115        C4.function2()116        #_lock2 = getattr(C4, '_synchronized_lock', None)117        _lock2 = getattr(C4.function2, '_synchronized_lock', None)118        self.assertNotEqual(_lock2, None)119        self.assertEqual(_lock2, _lock1)120        C4.function2()121        #_lock3 = getattr(C4, '_synchronized_lock', None)122        _lock3 = getattr(C4.function2, '_synchronized_lock', None)123        self.assertNotEqual(_lock3, None)124        self.assertEqual(_lock3, _lock2)125    def test_synchronized_instancemethod(self):126        if hasattr(C1, '_synchronized_lock'):127            del C1._synchronized_lock128        _lock0 = getattr(c1, '_synchronized_lock', None)129        self.assertEqual(_lock0, None)130        C1.function1(c1)131        _lock1 = getattr(c1, '_synchronized_lock', None)132        self.assertNotEqual(_lock1, None)133        c1.function1()134        _lock2 = getattr(c1, '_synchronized_lock', None)135        self.assertNotEqual(_lock2, None)136        self.assertEqual(_lock2, _lock1)137        c1.function1()138        _lock3 = getattr(c1, '_synchronized_lock', None)139        self.assertNotEqual(_lock3, None)140        self.assertEqual(_lock3, _lock2)141        del c1._synchronized_lock142        C1.function2()143        _lock4 = getattr(C1, '_synchronized_lock', None)144        self.assertNotEqual(_lock4, None)145        c1.function1()146        _lock5 = getattr(c1, '_synchronized_lock', None)147        self.assertNotEqual(_lock5, None)148        self.assertNotEqual(_lock5, _lock4)149    def test_synchronized_type_new_style(self):150        if hasattr(C2, '_synchronized_lock'):151            del C2._synchronized_lock152        _lock0 = getattr(C2, '_synchronized_lock', None)153        self.assertEqual(_lock0, None)154        c2 = C2()155        _lock1 = getattr(C2, '_synchronized_lock', None)156        self.assertNotEqual(_lock1, None)157        c2 = C2()158        _lock2 = getattr(C2, '_synchronized_lock', None)159        self.assertNotEqual(_lock2, None)160        self.assertEqual(_lock2, _lock1)161        c2 = C2()162        _lock3 = getattr(C2, '_synchronized_lock', None)163        self.assertNotEqual(_lock3, None)164        self.assertEqual(_lock3, _lock2)165    def test_synchronized_type_old_style(self):166        if hasattr(C3, '_synchronized_lock'):167            del C3._synchronized_lock168        _lock0 = getattr(C3, '_synchronized_lock', None)169        self.assertEqual(_lock0, None)170        c2 = C3()171        _lock1 = getattr(C3, '_synchronized_lock', None)172        self.assertNotEqual(_lock1, None)173        c2 = C3()174        _lock2 = getattr(C3, '_synchronized_lock', None)175        self.assertNotEqual(_lock2, None)176        self.assertEqual(_lock2, _lock1)177        c2 = C3()178        _lock3 = getattr(C3, '_synchronized_lock', None)179        self.assertNotEqual(_lock3, None)180        self.assertEqual(_lock3, _lock2)181if __name__ == '__main__':...sharedctypes.py
Source:sharedctypes.py  
...67        ctx = ctx or get_context()68        lock = ctx.RLock()69    if not hasattr(lock, 'acquire'):70        raise AttributeError("%r has no method 'acquire'" % lock)71    return synchronized(obj, lock, ctx=ctx)72def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):73    '''74    Return a synchronization wrapper for a RawArray75    '''76    obj = RawArray(typecode_or_type, size_or_initializer)77    if lock is False:78        return obj79    if lock in (True, None):80        ctx = ctx or get_context()81        lock = ctx.RLock()82    if not hasattr(lock, 'acquire'):83        raise AttributeError("%r has no method 'acquire'" % lock)84    return synchronized(obj, lock, ctx=ctx)85def copy(obj):86    new_obj = _new_value(type(obj))87    ctypes.pointer(new_obj)[0] = obj88    return new_obj89def synchronized(obj, lock=None, ctx=None):90    assert not isinstance(obj, SynchronizedBase), 'object already synchronized'91    ctx = ctx or get_context()92    if isinstance(obj, ctypes._SimpleCData):93        return Synchronized(obj, lock, ctx)94    elif isinstance(obj, ctypes.Array):95        if obj._type_ is ctypes.c_char:96            return SynchronizedString(obj, lock, ctx)97        return SynchronizedArray(obj, lock, ctx)98    else:99        cls = type(obj)100        try:101            scls = class_cache[cls]102        except KeyError:103            names = [field[0] for field in cls._fields_]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
