How to use control_master method in avocado

Best Python code snippet using avocado_python

time_conversion.py

Source:time_conversion.py Github

copy

Full Screen

1import multiprocessing as mp2try:3 mp.set_start_method('forkserver')4 mp.set_forkserver_preload(['numpy','pandas','WIP.robust','Support.Interaction.datetools','netCDF4'])5except:6 pass7from awrams.utils.awrams_log import get_module_logger8logger = get_module_logger('time_conversion')9def resample_data(in_path,in_pattern,variable,period,out_path,to_freq,method,mode='w',enforce_mask=True,extent=None,use_weights=False):10 '''11 method is 'sum' or 'mean'12 if no extent is supplied then the full (unmasked) input will be used13 'use_weights' should be set for unequally binned conversions (monthly->annual means, for example)14 '''15 from glob import glob16 import time17 import numpy as np18 from awrams.utils.messaging import reader as nr19 from awrams.utils.messaging import writer as nw20 from awrams.utils.messaging.brokers import OrderedFanInChunkBroker, FanOutChunkBroker21 from awrams.utils.messaging.general import message22 from awrams.utils.messaging.buffers import create_managed_buffers23 from awrams.utils.processing.chunk_resampler import ChunkedTimeResampler24 from awrams.utils.extents import subdivide_extent25 from awrams.utils import datetools as dt26 from awrams.utils import mapping_types as mt27 from awrams.utils.io import data_mapping as dm28 start = time.time()29 NWORKERS = 230 read_ahead = 331 writemax = 332 BLOCKSIZE = 12833 nbuffers = (NWORKERS*2)+read_ahead+writemax34 # Receives all messages from clients35 '''36 Build the 'standard queues'37 This should be wrapped up somewhere else for 38 various topologies...39 '''40 control_master = mp.Queue()41 worker_q = mp.Queue()42 for i in range(NWORKERS):43 worker_q.put(i)44 #Reader Queues45 chunk_out_r = mp.Queue(read_ahead)46 reader_in = dict(control=mp.Queue())47 reader_out = dict(control=control_master,chunks=chunk_out_r)48 #Writer Queues49 chunk_in_w = mp.Queue(writemax)50 writer_in = dict(control=mp.Queue(),chunks=chunk_in_w)51 writer_out = dict(control=control_master)52 #FanIn queues53 fanout_in = dict(control=mp.Queue(),chunks=chunk_out_r,workers=worker_q)54 fanout_out = dict(control=control_master)55 fanin_in = dict(control=mp.Queue())56 fanin_out = dict(control=control_master,out=chunk_in_w,workers=worker_q)57 58 #Worker Queues59 work_inq = []60 work_outq= []61 for i in range(NWORKERS):62 work_inq.append(mp.Queue())63 fanout_out[i] = work_inq[-1]64 work_outq.append(mp.Queue())65 fanin_in[i] = work_outq[-1]66 '''67 End standard queues...68 '''69 infiles = glob(in_path+'/'+in_pattern)70 if len(infiles) > 1:71 ff = dm.filter_years(period)72 else:73 ff = None74 sfm = dm.SplitFileManager.open_existing(in_path,in_pattern,variable,ff=ff)75 in_freq = sfm.get_frequency()76 split_periods = [period]77 if hasattr(in_freq,'freqstr'):78 if in_freq.freqstr == 'D':79 #Force splitting so that flat files don't end up getting loaded entirely into memory!80 #Also a bit of a hack to deal with PeriodIndex/DTI issues...81 split_periods = dt.split_period(dt.resample_dti(period,'d',as_period=False),'a')82 in_periods = [dt.resample_dti(p,in_freq) for p in split_periods]83 in_pmap = sfm.get_period_map_multi(in_periods)84 out_periods = []85 for p in in_periods:86 out_periods.append(dt.resample_dti(p,to_freq))87 if extent is None:88 extent = sfm.ref_ds.get_extent(True)89 if extent.mask.size == 1:90 extent.mask = (np.ones(extent.shape)*extent.mask).astype(np.bool)91 sub_extents = subdivide_extent(extent,BLOCKSIZE)92 chunks = [nr.Chunk(*s.indices) for s in sub_extents]93 out_period = dt.resample_dti(period,to_freq)94 out_cs = mt.gen_coordset(out_period,extent)95 v = mt.Variable.from_ncvar(sfm.ref_ds.awra_var)96 in_dtype = sfm.ref_ds.awra_var.dtype97 sfm.close_all()98 use_weights = False99 if method == 'mean':100 if dt.validate_timeframe(in_freq) == 'MONTHLY':101 use_weights = True102 '''103 Need a way of formalising multiple buffer pools for different classes of104 work..105 '''106 max_inplen = max([len(p) for p in in_periods])107 bufshape = (max_inplen,BLOCKSIZE,BLOCKSIZE)108 shared_buffers = {}109 shared_buffers['main'] = create_managed_buffers(nbuffers,bufshape,build=False)110 mvar = mt.MappedVariable(v,out_cs,in_dtype)111 sfm = dm.FlatFileManager(out_path,mvar)112 CLOBBER = mode=='w'113 sfm.create_files(False,CLOBBER,chunksize=(1,BLOCKSIZE,BLOCKSIZE))114 outfile_maps = {v.name:dict(nc_var=v.name,period_map=sfm.get_period_map_multi(out_periods))}115 infile_maps = {v.name:dict(nc_var=v.name,period_map=in_pmap)}116 reader = nr.StreamingReader(reader_in,reader_out,shared_buffers,infile_maps,chunks,in_periods)117 writer = nw.MultifileChunkWriter(writer_in,writer_out,shared_buffers,outfile_maps,sub_extents,out_periods,enforce_mask=enforce_mask)118 fanout = FanOutChunkBroker(fanout_in,fanout_out)119 fanin = OrderedFanInChunkBroker(fanin_in,fanin_out,NWORKERS,len(chunks))120 fanout.start()121 fanin.start()122 workers = []123 w_control = []124 for i in range(NWORKERS):125 w_in = dict(control=mp.Queue(),chunks=work_inq[i])126 w_out = dict(control=control_master,chunks=work_outq[i])127 w = ChunkedTimeResampler(w_in,w_out,shared_buffers,sub_extents,in_periods,to_freq,method,enforce_mask=enforce_mask,use_weights=use_weights)128 workers.append(w)129 w_control.append(w_in['control'])130 w.start()131 writer.start()132 reader.start()133 writer.join()134 fanout_in['control'].put(message('terminate'))135 fanin_in['control'].put(message('terminate'))136 for i in range(NWORKERS):137 w_control[i].put(message('terminate'))138 for x in range(4):139 control_master.get()140 for i in range(NWORKERS):141 workers[i].join()142 control_master.get()143 reader.join()144 fanout.join()145 fanin.join()146 end = time.time()...

Full Screen

Full Screen

rest_api.py

Source:rest_api.py Github

copy

Full Screen

1import json2from logging import getLogger3import bottle4ENDPOINT_RESET = "/reset"5_logger = getLogger("rest_api")6def register_rest_interface(app, manager):7 @app.post(ENDPOINT_RESET)8 def reset():9 manager.reset()10 return {"state": "ok",11 "status": "Backend reset completed."}12 @app.error(500)13 def error_handler_500(error):14 bottle.response.content_type = 'application/json'15 bottle.response.status = 20016 error_text = str(error.exception)17 _logger.error(error_text)18 return json.dumps({"state": "error",19 "status": error_text})20class Manager(object):21 def __init__(self, ringbuffer, control_master):22 self.ringbuffer = ringbuffer23 self.control_master = control_master24 def reset(self):25 self.control_master.send_message(1)26 self.ringbuffer.reset_header()27def start_rest_api(rest_host, rest_port, ringbuffer, control_master):28 manager = Manager(ringbuffer, control_master)29 ringbuffer.create_buffer()30 app = bottle.Bottle()31 register_rest_interface(app, manager)32 try:33 _logger.info("Starting rest API on rest_host=%s, rest_port=%s." % (rest_host,rest_port))34 bottle.run(app=app, host=rest_host, port=rest_port)35 except KeyboardInterrupt:36 pass37 except:...

Full Screen

Full Screen

ssh.py

Source:ssh.py Github

copy

Full Screen

1import os2from subprocess import Popen, PIPE3from commander.utils import PStatus4class SSHClient(object):5 def __init__(self, host, identity_file=None, jumphost=None, control_master=False):6 self.host = host7 self.identity_file = identity_file8 self.jumphost = jumphost9 self.control_master = control_master10 def run(self, cmd):11 raise NotImplementedError()12class SSHExecClient(SSHClient):13 def run(self, cmd):14 extra = []15 if self.jumphost:16 extra.append('-o "ProxyCommand ssh -A %s nc %%h %%p' % self.jumphost)17 if self.control_master:18 extra.append('-o "ControlMaster auto"')19 extra.append('-o "ControlPath /tmp/commander_mux_%h_%p_%r"')20 extra.append('-o "ControlPersist 10m"')21 if self.identity_file:22 if os.path.isfile(self.identity_file):23 extra.append("-i %s" % self.identity_file)24 else:25 raise ValueError("identity_file should be a valid file")26 cmd = """ssh -T %s %s <<'EOF'27 %s28EOF""" % (29 " ".join(extra),30 self.host,31 cmd,32 )33 p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)34 out, err = p.communicate()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful