Best Python code snippet using avocado_python
time_conversion.py
Source:time_conversion.py  
1import multiprocessing as mp2try:3    mp.set_start_method('forkserver')4    mp.set_forkserver_preload(['numpy','pandas','WIP.robust','Support.Interaction.datetools','netCDF4'])5except:6    pass7from awrams.utils.awrams_log import get_module_logger8logger = get_module_logger('time_conversion')9def resample_data(in_path,in_pattern,variable,period,out_path,to_freq,method,mode='w',enforce_mask=True,extent=None,use_weights=False):10    '''11    method is 'sum' or 'mean'12    if no extent is supplied then the full (unmasked) input will be used13    'use_weights' should be set for unequally binned conversions (monthly->annual means, for example)14    '''15    from glob import glob16    import time17    import numpy as np18    from awrams.utils.messaging import reader as nr19    from awrams.utils.messaging import writer as nw20    from awrams.utils.messaging.brokers import OrderedFanInChunkBroker, FanOutChunkBroker21    from awrams.utils.messaging.general import message22    from awrams.utils.messaging.buffers import create_managed_buffers23    from awrams.utils.processing.chunk_resampler import ChunkedTimeResampler24    from awrams.utils.extents import subdivide_extent25    from awrams.utils import datetools as dt26    from awrams.utils import mapping_types as mt27    from awrams.utils.io import data_mapping as dm28    start = time.time()29    NWORKERS = 230    read_ahead = 331    writemax = 332    BLOCKSIZE = 12833    nbuffers = (NWORKERS*2)+read_ahead+writemax34    # Receives all messages from clients35    '''36    Build the 'standard queues'37    This should be wrapped up somewhere else for 38    various topologies...39    '''40    control_master = mp.Queue()41    worker_q = mp.Queue()42    for i in range(NWORKERS):43        worker_q.put(i)44    #Reader Queues45    chunk_out_r = mp.Queue(read_ahead)46    reader_in = dict(control=mp.Queue())47    reader_out = dict(control=control_master,chunks=chunk_out_r)48    #Writer Queues49    chunk_in_w = mp.Queue(writemax)50    writer_in = dict(control=mp.Queue(),chunks=chunk_in_w)51    writer_out = dict(control=control_master)52    #FanIn queues53    fanout_in = dict(control=mp.Queue(),chunks=chunk_out_r,workers=worker_q)54    fanout_out = dict(control=control_master)55    fanin_in = dict(control=mp.Queue())56    fanin_out = dict(control=control_master,out=chunk_in_w,workers=worker_q)57    58    #Worker Queues59    work_inq = []60    work_outq= []61    for i in range(NWORKERS):62        work_inq.append(mp.Queue())63        fanout_out[i] = work_inq[-1]64        work_outq.append(mp.Queue())65        fanin_in[i] = work_outq[-1]66    '''67    End standard queues...68    '''69    infiles = glob(in_path+'/'+in_pattern)70    if len(infiles) > 1:71        ff = dm.filter_years(period)72    else:73        ff = None74    sfm = dm.SplitFileManager.open_existing(in_path,in_pattern,variable,ff=ff)75    in_freq = sfm.get_frequency()76    split_periods = [period]77    if hasattr(in_freq,'freqstr'):78        if in_freq.freqstr == 'D':79            #Force splitting so that flat files don't end up getting loaded entirely into memory!80            #Also a bit of a hack to deal with PeriodIndex/DTI issues...81            split_periods = dt.split_period(dt.resample_dti(period,'d',as_period=False),'a')82    in_periods = [dt.resample_dti(p,in_freq) for p in split_periods]83    in_pmap = sfm.get_period_map_multi(in_periods)84    out_periods = []85    for p in in_periods:86        out_periods.append(dt.resample_dti(p,to_freq))87    if extent is None:88        extent = sfm.ref_ds.get_extent(True)89        if extent.mask.size == 1:90            extent.mask = (np.ones(extent.shape)*extent.mask).astype(np.bool)91    sub_extents = subdivide_extent(extent,BLOCKSIZE)92    chunks = [nr.Chunk(*s.indices) for s in sub_extents]93    out_period = dt.resample_dti(period,to_freq)94    out_cs = mt.gen_coordset(out_period,extent)95    v = mt.Variable.from_ncvar(sfm.ref_ds.awra_var)96    in_dtype = sfm.ref_ds.awra_var.dtype97    sfm.close_all()98    use_weights = False99    if method == 'mean':100        if dt.validate_timeframe(in_freq) == 'MONTHLY':101            use_weights = True102    '''103    Need a way of formalising multiple buffer pools for different classes of104    work..105    '''106    max_inplen = max([len(p) for p in in_periods])107    bufshape = (max_inplen,BLOCKSIZE,BLOCKSIZE)108    shared_buffers = {}109    shared_buffers['main'] = create_managed_buffers(nbuffers,bufshape,build=False)110    mvar = mt.MappedVariable(v,out_cs,in_dtype)111    sfm = dm.FlatFileManager(out_path,mvar)112    CLOBBER = mode=='w'113    sfm.create_files(False,CLOBBER,chunksize=(1,BLOCKSIZE,BLOCKSIZE))114    outfile_maps = {v.name:dict(nc_var=v.name,period_map=sfm.get_period_map_multi(out_periods))}115    infile_maps = {v.name:dict(nc_var=v.name,period_map=in_pmap)}116    reader = nr.StreamingReader(reader_in,reader_out,shared_buffers,infile_maps,chunks,in_periods)117    writer = nw.MultifileChunkWriter(writer_in,writer_out,shared_buffers,outfile_maps,sub_extents,out_periods,enforce_mask=enforce_mask)118    fanout = FanOutChunkBroker(fanout_in,fanout_out)119    fanin = OrderedFanInChunkBroker(fanin_in,fanin_out,NWORKERS,len(chunks))120    fanout.start()121    fanin.start()122    workers = []123    w_control = []124    for i in range(NWORKERS):125        w_in = dict(control=mp.Queue(),chunks=work_inq[i])126        w_out = dict(control=control_master,chunks=work_outq[i])127        w = ChunkedTimeResampler(w_in,w_out,shared_buffers,sub_extents,in_periods,to_freq,method,enforce_mask=enforce_mask,use_weights=use_weights)128        workers.append(w)129        w_control.append(w_in['control'])130        w.start()131    writer.start()132    reader.start()133    writer.join()134    fanout_in['control'].put(message('terminate'))135    fanin_in['control'].put(message('terminate'))136    for i in range(NWORKERS):137        w_control[i].put(message('terminate'))138    for x in range(4):139        control_master.get()140    for i in range(NWORKERS):141        workers[i].join()142        control_master.get()143    reader.join()144    fanout.join()145    fanin.join()146    end = time.time()...rest_api.py
Source:rest_api.py  
1import json2from logging import getLogger3import bottle4ENDPOINT_RESET = "/reset"5_logger = getLogger("rest_api")6def register_rest_interface(app, manager):7    @app.post(ENDPOINT_RESET)8    def reset():9        manager.reset()10        return {"state": "ok",11                "status": "Backend reset completed."}12    @app.error(500)13    def error_handler_500(error):14        bottle.response.content_type = 'application/json'15        bottle.response.status = 20016        error_text = str(error.exception)17        _logger.error(error_text)18        return json.dumps({"state": "error",19                           "status": error_text})20class Manager(object):21    def __init__(self, ringbuffer, control_master):22        self.ringbuffer = ringbuffer23        self.control_master = control_master24    def reset(self):25        self.control_master.send_message(1)26        self.ringbuffer.reset_header()27def start_rest_api(rest_host, rest_port, ringbuffer, control_master):28    manager = Manager(ringbuffer, control_master)29    ringbuffer.create_buffer()30    app = bottle.Bottle()31    register_rest_interface(app, manager)32    try:33        _logger.info("Starting rest API on rest_host=%s, rest_port=%s." % (rest_host,rest_port))34        bottle.run(app=app, host=rest_host, port=rest_port)35    except KeyboardInterrupt:36        pass37    except:...ssh.py
Source:ssh.py  
1import os2from subprocess import Popen, PIPE3from commander.utils import PStatus4class SSHClient(object):5    def __init__(self, host, identity_file=None, jumphost=None, control_master=False):6        self.host = host7        self.identity_file = identity_file8        self.jumphost = jumphost9        self.control_master = control_master10    def run(self, cmd):11        raise NotImplementedError()12class SSHExecClient(SSHClient):13    def run(self, cmd):14        extra = []15        if self.jumphost:16            extra.append('-o "ProxyCommand ssh -A %s nc %%h %%p' % self.jumphost)17        if self.control_master:18            extra.append('-o "ControlMaster auto"')19            extra.append('-o "ControlPath /tmp/commander_mux_%h_%p_%r"')20            extra.append('-o "ControlPersist 10m"')21        if self.identity_file:22            if os.path.isfile(self.identity_file):23                extra.append("-i %s" % self.identity_file)24            else:25                raise ValueError("identity_file should be a valid file")26        cmd = """ssh -T %s %s <<'EOF'27            %s28EOF""" % (29            " ".join(extra),30            self.host,31            cmd,32        )33        p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)34        out, err = p.communicate()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
