Best Python code snippet using localstack_python
test_api.py
Source:test_api.py  
...48            app1, [job_id1, job_id2], all=True,49            raise_if_not_exists=True50        ))51@tt.with_setup52def test_get_qsize(app1, job_id1, job_id2):53    nt.assert_equal(api.get_qsize(app1, queued=True, taken=True), 0)54    tt.enqueue(app1, job_id1, )55    tt.enqueue(app1, job_id2, validate_queued=False)56    q = api.get_qbclient().LockingQueue(app1)57    itm = q.get()58    nt.assert_equal(2, api.get_qsize(app1, queued=True, taken=True))59    nt.assert_equal(1, api.get_qsize(app1, queued=False, taken=True))60    nt.assert_equal(1, api.get_qsize(app1, queued=True, taken=False))61    q.put(itm)62    q.consume()63    nt.assert_equal(2, api.get_qsize(app1, queued=True, taken=True))64    nt.assert_equal(0, api.get_qsize(app1, queued=False, taken=True))65    nt.assert_equal(2, api.get_qsize(app1, queued=True, taken=False))66@tt.with_setup67def test_maybe_add_subtask(app1, job_id1, job_id2, job_id3):68    # we don't queue anything if we request queue=False, but we create data for69    # this node if it doesn't exist70    tt.validate_zero_queued_task(app1)71    api.maybe_add_subtask(app1, job_id1, queue=False)72    tt.validate_zero_queued_task(app1)73    # data for this job_id exists, so it can't get queued74    api.maybe_add_subtask(app1, job_id1, priority=4)75    tt.validate_zero_queued_task(app1)76    api.maybe_add_subtask(app1, job_id2, priority=8)77    tt.validate_one_queued_task(app1, job_id2)78    api.maybe_add_subtask(app1, job_id3, priority=5)79    # this should have no effect because it's already queued with priority=5...parallel_generator.py
Source:parallel_generator.py  
...62        i = 063        # before = time.time()64       65        while not self.buffer.full():66            if self.get_qsize():67                buffer_size = self.get_qsize()68                print('Filling train buffer: {}/{}'.format(69                    self.get_qsize(),70                    str(self.buffer_size).ljust(len(str(buffer_size)))71                ), end='\r')72            else:73                print('Filling {} buffer'.format('train')74                    + '.' * i + ' ' * (3 - i), end='\r')75                i += 176                i = i % 477            time.sleep(.5)78        print("Filling {} buffer... Done.".format('train'))79        # print('\n\n\n\n\n TOTAL TIME: ', time.time() - before, '\n\n\n')80    def fill_test_registry(self):81        return cp(self.testing)82    def reset_test(self):83        self.test_registry = cp(self.testing)84    def push(self, buffer):85        """86        Loads and augments datapoint with a seed87        and places it in a multiprocessing buffer.88        BEWARE: This method is called by multiple subprocesses.89        Ensure thread safety!90        """91        # We have to seed the differnt processes or else they 92        # will draw identical samples93        np.random.seed(os.getpid())94        # We also might want to initialize each process95        # on different classes if class balancing is desired.96        while True:97            items = self.get_datapoints(seed=os.getpid())98            for item in items:99                buffer.put(item)100                # print("Process #{} successfully added item of class {} to buffer.".format(os.getpid(), idx))101        # print('Producer {} exiting'.format(os.getpid()))102    def get_datapoints(self, **kwargs):103        """104        Method that from a seed loads a datapoint (x and y) and 105        places returns them in a list for pushing to the buffer.106        This method is the bread and butter of the generator class,107        everything else in this class are simply support methods for 108        maintaining the queue/buffer and sorting the data samples.109        This method should actively load the data point from the drive,110        normalize and augment.111        """112        raise NotImplementedError113    def get_qsize(self):114        try:115            return self.buffer.qsize()116        except NotImplementedError:117            return False118    def dtype_to_order(self, dtype):119        if dtype in [np.int8, np.int16, np.int32, np.int64,120                     np.uint8, np.uint16, np.uint32, np.uint64,121                     bool, np.bool]:122            return 0123        return 3124    def __call__(self, batch_size, handle='train', patient_handle=None):125        out = []126        if handle == 'train':127            for _ in range(batch_size):...simple_thread_runner.py
Source:simple_thread_runner.py  
...58            self._queue.task_done()59            logger.info(f"Tasks left:{i}")60    def q_producer(self, _data):61        self._queue.put(_data)62    def get_qsize(self) -> int:63        """Get current size of queue, be aware this value is changed frequently64        as multiple threads may produce/consume data to the queue"""65        return self._queue.qsize()66    def q_consumer(self, num_workers: int, fn: Callable[..., Any]):67        """68        Function can be used separately with q_producer69        """70        with self._lock:71            try:72                self.prepare_threads(num_workers, fn)73            finally:74                self.wait_threads()75    def run_threads(self, num_workers: int, fn: Callable[..., Any], iter_data: Iterator[Any], batch_size: int = None):76        """Add batch_size params in case iter_data is huge number"""77        for _ in iter_data:78            self.q_producer(_)79            if batch_size:80                _qsize = self.get_qsize()81                if _qsize >= batch_size:82                    self.q_consumer(num_workers, fn)83        _qsize = self.get_qsize()84        if _qsize != 0:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
