Best Python code snippet using localstack_python
Loops.py
Source:Loops.py  
1import os2import os.path3import sys4import threading5import asyncio6from osbot_utils.utils.Files import create_folder, folder_exists7from cdr_plugin_folder_to_folder.common_settings.Config import Config8from cdr_plugin_folder_to_folder.processing.Events_Log import Events_Log9from cdr_plugin_folder_to_folder.processing.File_Processing import File_Processing10from cdr_plugin_folder_to_folder.metadata.Metadata_Service import Metadata_Service11from cdr_plugin_folder_to_folder.pre_processing.Status import Status12from cdr_plugin_folder_to_folder.pre_processing.Hash_Json import Hash_Json13from cdr_plugin_folder_to_folder.pre_processing.Status import FileStatus14from elasticsearch import Elasticsearch15from datetime import datetime16from cdr_plugin_folder_to_folder.utils.Log_Duration import log_duration17from cdr_plugin_folder_to_folder.utils.Logging import log_error, log_info18class Loops(object):19    continue_processing = False20    processing_started = False21    lock = asyncio.Lock()22    def __init__(self):23        self.use_es = False24        self.config = Config().load_values()25        self.status = Status()26        self.hash_json = Hash_Json()27        self.status.get_from_file()28        self.hash_json.get_from_file()29        self.events = Events_Log(os.path.join(self.config.hd2_location, "status"))30    def IsProcessing(self):31        return Loops.processing_started32    def StopProcessing(self):33        Loops.continue_processing = False34    def HasBeenStopped(self):35        return not Loops.continue_processing36    @log_duration37    def ProcessDirectoryWithEndpoint(self, itempath, file_index, endpoint_index):38        self.config = Config().load_values()39        meta_service = Metadata_Service()40        original_file_path = meta_service.get_original_file_path(itempath)41        file_processing = File_Processing()42        events = Events_Log(itempath)43        endpoint = "http://" + self.config.endpoints['Endpoints'][endpoint_index]['IP'] + ":" + self.config.endpoints['Endpoints'][endpoint_index]['Port']44        events.add_log("Processing with: " + endpoint)45        if os.path.isdir(itempath):46            try:47                if not file_processing.processDirectory(endpoint, itempath):48                    events.add_log("CANNOT be processed")49                    return False50                log_data = {51                        'file': original_file_path,52                        'status': FileStatus.COMPLETED.value,53                        'error': 'none',54                        'timestamp': datetime.now(),55                    }56                log_info('ProcessDirectoryWithEndpoint', data=log_data)57                meta_service.set_error(itempath, "none")58                meta_service.set_status(itempath, FileStatus.COMPLETED.value)59                self.status.update_counters(file_index,FileStatus.COMPLETED.value)60                self.hash_json.update_status(file_index,FileStatus.COMPLETED.value)61                events.add_log("Has been processed")62                return True63            except Exception as error:64                log_data = {65                    'file': original_file_path,66                    'status': FileStatus.FAILED.value,67                    'error': str(error),68                }69                log_error('error in ProcessDirectoryWithEndpoint', data=log_data)70                meta_service.set_error(itempath, str(error))71                meta_service.set_status(itempath, FileStatus.FAILED.value)72                self.status.update_counters(file_index,FileStatus.FAILED.value)73                self.hash_json.update_status(file_index,FileStatus.FAILED.value)74                events.add_log("ERROR:" + str(error))75                return False76    @log_duration77    def ProcessDirectory(self, itempath, file_index, process_index):78        self.config = Config().load_values()79        endpoint_index = process_index % self.config.endpoints_count80        for idx in range(self.config.endpoints_count):81            if self.ProcessDirectoryWithEndpoint(itempath, file_index, endpoint_index):82                return83            # The Endpoint failed to process the file84            # Retry it with the next one85            endpoint_index = (endpoint_index + 1) % self.config.endpoints_count86    @log_duration87    def LoopHashDirectoriesInternal(self, thread_count, do_single):88        self.events.get_from_file()89        self.events.add_log("LoopHashDirectoriesAsync started")90        self.status.get_from_file()91        self.hash_json.get_from_file()92        rootdir = os.path.join(self.config.hd2_location, "data")93        if folder_exists(rootdir) is False:94            log_error("ERROR: rootdir does not exist: " + rootdir)95            return96        file_index = 097        threads = list()98        file_list = self.hash_json.get_file_list()99        process_index = 0100        for index in range(len(file_list)):101            itempath = os.path.join(rootdir,file_list[index]["hash"])102            file_index = file_list[index]["id"]103            if (FileStatus.INITIAL.value != file_list[index]["file_status"]):104                continue105            process_index += 1106            x = threading.Thread(target=self.ProcessDirectory, args=(itempath, file_index, process_index,))107            threads.append(x)108            x.start()109            if do_single:110                break111            # limit the number of parallel threads112            if file_index % int(thread_count) == 0:113                # Clean up the threads114                for index, thread in enumerate(threads):115                    thread.join()116            if not Loops.continue_processing:117                break118        for index, thread in enumerate(threads):119            thread.join()120        self.status.write_to_file()121        self.hash_json.write_to_file()122        self.events.add_log("LoopHashDirectoriesAsync finished")123    @log_duration124    async def LoopHashDirectoriesAsync(self, thread_count, do_single = False):125        await Loops.lock.acquire()126        try:127            Loops.continue_processing = True128            Loops.processing_started = True129            self.LoopHashDirectoriesInternal(thread_count, do_single)130        finally:131            Loops.processing_started = False132            Loops.lock.release()133    @log_duration134    def LoopHashDirectories(self):135        #Allow only a single loop to be run at a time136        if self.IsProcessing():137            log_error("ERROR: Attempt to start processing while processing is in progress")138            return False139        loop = asyncio.new_event_loop()140        asyncio.set_event_loop(loop)141        loop.run_until_complete(self.LoopHashDirectoriesAsync(self.config.thread_count))142        return True143    @log_duration144    def LoopHashDirectoriesSequential(self):145        #Allow only a single loop to be run at a time146        if self.IsProcessing():147            log_error("ERROR: Attempt to start processing while processing is in progress")148            return False149        loop = asyncio.new_event_loop()150        asyncio.set_event_loop(loop)151        loop.run_until_complete(self.LoopHashDirectoriesAsync(1))152        return True153    @log_duration154    def ProcessSingleFile(self):155        if self.IsProcessing():156            log_error("ERROR: Attempt to start processing while processing is in progress")157            return False158        loop = asyncio.new_event_loop()159        asyncio.set_event_loop(loop)160        loop.run_until_complete(self.LoopHashDirectoriesAsync(1, True))...test_Processor.py
Source:test_Processor.py  
1import os2import sys3from unittest import TestCase4from osbot_utils.utils.Dev import pprint5from osbot_utils.utils.Files import folder_exists, folder_create, file_copy6from cdr_plugin_folder_to_folder.common_settings.Config import Config7from cdr_plugin_folder_to_folder.pre_processing.Pre_Processor import Pre_Processor8from cdr_plugin_folder_to_folder.processing.Loops import Loops9from cdr_plugin_folder_to_folder.utils.Log_Duration import log_duration10from cdr_plugin_folder_to_folder.utils.testing.Test_Data import Test_Data11from cdr_plugin_folder_to_folder.api.routes.Processing import process_single_file12from cdr_plugin_folder_to_folder.api.routes.Processing import process_hd2_data_to_hd313from cdr_plugin_folder_to_folder.api.routes.Processing import process_hd2_data_to_hd3_sequential14sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))15class test_Processor(TestCase):16    def setUp(self) -> None:17        self.config         = Config().load_values()18        self.repotrs_path   = os.path.join(self.config.hd2_location,"reports")19        self.processed_path = os.path.join(self.config.hd2_location,"processed")20    def tearDown(self) -> None:21        pass22    def test__init__(self):23        pre_processor = Pre_Processor()24        pre_processor.clear_data_and_status_folders()       # clear output folders25        pre_processor.process_files()                       # copy files across26        assert folder_exists(self.config.hd1_location)27        assert folder_exists(self.config.hd2_location)28        assert folder_exists(self.config.hd3_location)29    def test_flags(self):30        loops = Loops()31        assert loops.IsProcessing() == False32        loops.StopProcessing()33        assert loops.HasBeenStopped() == True34    @log_duration35    def test_process_file(self):36        assert ("File has been processed" == process_single_file())37        assert len(os.listdir(self.config.hd3_location)) != 038    @log_duration39    def test_process_files(self):40        assert ("Loop completed" == process_hd2_data_to_hd3())41        assert len(os.listdir(self.config.hd3_location)) != 042    @log_duration43    def test_process_files_sequential(self):44        assert ("Loop completed" == process_hd2_data_to_hd3_sequential())45        assert len(os.listdir(self.config.hd3_location)) != 046    @log_duration47    def test_processing_inprogress(self):48        loops = Loops()49        Loops.processing_started = True50        assert loops.ProcessSingleFile() is False...models.py
Source:models.py  
...5from openerp import _, api, fields, models6from .decorator import log_duration7#class StockPicking(models.Model):8#    _inherit = 'stock.picking'9#    @log_duration(pdb_limit=5.0)10#    @api.multi11#    def read(self, fields=None, load='_classic_read'):12#        return super(StockPicking, self).read(fields=fields, load=load)13#class StockMove(models.Model):14#    _inherit = 'stock.move'15#    @log_duration(pdb_limit=5.0)16#    @api.multi17#    def read(self, fields=None, load='_classic_read'):18#        return super(StockMove, self).read(fields=fields, load=load)19#class AccountInvoice(models.Model):20#    _inherit = 'account.invoice'21#    @log_duration(pdb_limit=3.0)22#    @api.multi23#    def read(self, fields=None, load='_classic_read'):24#        return super(AccountInvoice, self).read(fields=fields, load=load)25#class AccountInvoiceLine(models.Model):26#    _inherit = 'account.invoice.line'27#    @log_duration(pdb_limit=1.0)28#    @api.multi29#    def read(self, fields=None, load='_classic_read'):30#        return super(AccountInvoiceLine, self).read(fields=fields, load=load)31#class PurchaseOrder(models.Model):32#    _inherit = 'purchase.order'33#    @log_duration(pdb_limit=3.0)34#    @api.multi35#    def read(self, fields=None, load='_classic_read'):36#        return super(PurchaseOrder, self).read(fields=fields, load=load)37#class PurchaseOrderLine(models.Model):38#    _inherit = 'purchase.order.line'39#    @log_duration(pdb_limit=1.0)40#    @api.multi41#    def read(self, fields=None, load='_classic_read'):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
