How to use apply_change method in localstack

Best Python code snippet using localstack_python

buffs.py

Source:buffs.py Github

copy

Full Screen

...162 self.src_trackers[0] = [0, 0, encounter_start]163 164 self.data = []165 166 def apply_change(self, time, new_count, src_instid):167 tracker = self.src_trackers[src_instid]168 duration = time - tracker[2]169 if duration > 0:170 self.data.append([tracker[2], duration, self.buff_type.code, src_instid, self.dst_instid, tracker[1]])171 tracker[1] = new_count 172 tracker[2] = time173 def add_event(self, event):174 if event.time != self.current_time:175 self.simulate_to_time(event.time)176 if event.is_buffremove:177 self.clear(event.time)178 elif event.is_offcycle:179 if self.last_extend_time != event.time:180 for stack in self.stack_durations:181 stack[0] += event.value182 if stack[1] == 0:183 stack[2] += event.value;184 self.last_extend_time = event.time;185 elif len(self.stack_durations) < self.buff_type.capacity:186 end_time = event.time + event.value;187 self.stack_durations.append([end_time, event.ult_src_instid, end_time])188 self.stack_durations.sort()189 self.apply_change(event.time, self.src_trackers[event.ult_src_instid][1] + 1, event.ult_src_instid)190 elif self.stack_durations[0][0] < event.time + event.value:191 old_src = self.stack_durations[0][1]192 if old_src != event.ult_src_instid:193 self.apply_change(event.time, self.src_trackers[old_src][1] - 1, old_src)194 self.apply_change(event.time, self.src_trackers[event.ult_src_instid][1] + 1, event.ult_src_instid)195 end_time = event.time + event.value;196 self.stack_durations[0] = [end_time, event.ult_src_instid, end_time]197 self.stack_durations.sort() 198 def clear(self, time):199 if len(self.stack_durations) > 0:200 self.stack_durations = []201 for x in self.src_trackers:202 self.apply_change(time, 0, x)203 204 def simulate_to_time(self, new_time):205 self.stack_durations.sort(key=lambda x: x[2])206 while (len(self.stack_durations) > 0) and (self.stack_durations[0][2] <= new_time):207 if self.stack_durations[0][1] != 0:208 self.apply_change(self.stack_durations[0][2], self.src_trackers[self.stack_durations[0][1]][1] - 1, self.stack_durations[0][1])209 self.apply_change(self.stack_durations[0][2], self.src_trackers[0][1] + 1, 0)210 self.stack_durations[0][1] = 0211 self.stack_durations[0][2] = self.stack_durations[0][0]212 self.stack_durations.sort(key=lambda x: x[2])213 else:214 self.apply_change(self.stack_durations[0][0], self.src_trackers[0][1] - 1, 0)215 del self.stack_durations[0] 216 self.stack_durations.sort()217# while (len(self.stack_durations) > 0) and (self.stack_durations[0][0] <= new_time):218# self.apply_change(self.stack_durations[0][0], self.src_trackers[self.stack_durations[0][1]][1] - 1, self.stack_durations[0][1])219# del self.stack_durations[0] 220 self.current_time = new_time221 222 def end_track(self, time):223 end_time = int(time)224 self.simulate_to_time(end_time)225 self.clear(time)226 227class BuffTrackDuration:228 def __init__(self, buff_type, dst_instid, src_instids, encounter_start, encounter_end):229 self.buff_type = buff_type230 self.dst_instid = dst_instid231 self.stack_durations = []232 self.data = []233 self.current_time = encounter_start234 self.current_src = -1235 self.src_trackers = {}236 for src in src_instids:237 self.src_trackers[src] = [src, encounter_start, 0]238 self.src_trackers[0] = [0, encounter_start, 0]239 240 def apply_change(self, time, src_instid):241 tracker = self.src_trackers[src_instid]242 duration = time - tracker[1]243 count = 0244 if len(self.stack_durations) > 0 and self.stack_durations[0][1] == src_instid:245 count = 1246 if duration > 0:247 self.data.append([tracker[1], duration, self.buff_type.code, src_instid, self.dst_instid, tracker[2]])248 tracker[1] = time249 tracker[2] = count250 251 def add_event(self, event):252 if event.time != self.current_time:253 self.simulate(event.time - self.current_time)254 if event.is_buffremove:255 self.clear(event.time)256 elif event.is_offcycle:257 if len(self.stack_durations) > 0:258 self.stack_durations[0][0] += event.value259 if self.stack_durations[0][1] != 0:260 self.stack_durations[0][2] += event.value261 self.stack_durations.sort()262 if self.stack_durations[0][1] != self.current_src:263 self.apply_change(event.time, self.current_src)264 self.apply_change(event.time, self.stack_durations[0][1])265 self.current_src = self.stack_durations[0][1]266 elif len(self.stack_durations) < self.buff_type.capacity:267 self.stack_durations.append([event.value, event.ult_src_instid, 0])268 if len(self.stack_durations) == 1:269 self.apply_change(event.time, self.stack_durations[0][1])270 self.current_src = self.stack_durations[0][1]271 else:272 self.stack_durations.sort()273 if self.stack_durations[0][1] != self.current_src:274 self.apply_change(event.time, self.current_src)275 self.apply_change(event.time, self.stack_durations[0][1])276 self.current_src = self.stack_durations[0][1]277 278 elif self.stack_durations[0][0] < event.value:279 self.stack_durations[0] = [event.value, event.ult_src_instid, 0]280 self.stack_durations.sort()281 if self.stack_durations[0][1] != self.current_src:282 self.apply_change(event.time, self.current_src)283 self.apply_change(event.time, self.stack_durations[0][1]) 284 self.current_src = self.stack_durations[0][1]285 286 def simulate(self, delta_time):287 remaining_delta = delta_time288 while len(self.stack_durations) > 0 and self.stack_durations[0][0] <= remaining_delta:289 if self.stack_durations[0][1] != 0 and self.stack_durations[0][2] > 0:290 self.current_time += self.stack_durations[0][0] - self.stack_durations[0][2]291 remaining_delta -= self.stack_durations[0][0] - self.stack_durations[0][2]292 self.stack_durations[0][0] = self.stack_durations[0][2]293 self.stack_durations[0][1] = 0294 self.apply_change(self.current_time, self.current_src)295 self.apply_change(self.current_time, 0)296 self.current_src = 0;297 298 self.current_time += self.stack_durations[0][0]299 remaining_delta -= self.stack_durations[0][0]300 del self.stack_durations[0]301 if len(self.stack_durations) == 0:302 self.apply_change(self.current_time, self.current_src)303 self.current_src = -1304 elif self.current_src != self.stack_durations[0][1]:305 self.apply_change(self.current_time, self.current_src)306 self.apply_change(self.current_time, self.stack_durations[0][1])307 self.current_src = self.stack_durations[0][1]308 self.current_time += remaining_delta309 if len(self.stack_durations) > 0:310 if self.stack_durations[0][1] != 0 and self.stack_durations[0][0] - self.stack_durations[0][2] < remaining_delta:311 self.stack_durations[0][1] = 0312 self.apply_change(self.current_time, self.current_src)313 self.apply_change(self.current_time, 0)314 self.current_src = 0;315 316 self.stack_durations[0][0] -= remaining_delta317 def clear(self, time):318 if len(self.stack_durations) > 0:319 self.stack_durations = []320 if self.current_src != -1:321 self.apply_change(time, self.current_src)322 self.current_src = -1323 324 def end_track(self, time):325 end_time = int(time)326 self.simulate(end_time - self.current_time)327 for x in self.src_trackers:328 self.apply_change(time, x)329 330class BuffPreprocessor:331 def process_events(self, start_time, end_time, skills, players, player_events):332 def process_buff_events(buff_type, buff_events, raw_buff_data):333 for player in list(players.index):334 relevent_events = buff_events[buff_events['dst_instid'] == player]335 agent_start_time = self.get_time(player_events[player_events['src_instid'] == player], parser.StateChange.SPAWN, start_time)336 agent_end_time = self.get_time(player_events[player_events['src_instid'] == player], parser.StateChange.DESPAWN, end_time)337 if len(relevent_events) > 0:338 if relevent_events.time.min() < agent_start_time:339 agent_start_time = start_time340 if relevent_events.time.max() > agent_end_time:341 agent_end_time = end_time342 if (buff_type.stacking == StackType.INTENSITY):...

Full Screen

Full Screen

mac_sysctl.py

Source:mac_sysctl.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2'''3Module for viewing and modifying sysctl parameters4'''5from __future__ import absolute_import6# Import python libs7import os8# Import salt libs9import salt.utils.files10from salt.exceptions import CommandExecutionError11# Define the module's virtual name12__virtualname__ = 'sysctl'13def __virtual__():14 '''15 Only run on Darwin (macOS) systems16 '''17 if __grains__['os'] == 'MacOS':18 return __virtualname__19 return (False, 'The darwin_sysctl execution module cannot be loaded: '20 'Only available on macOS systems.')21def show(config_file=False):22 '''23 Return a list of sysctl parameters for this minion24 CLI Example:25 .. code-block:: bash26 salt '*' sysctl.show27 '''28 roots = (29 'audit',30 'debug',31 'hw',32 'hw',33 'kern',34 'machdep',35 'net',36 'net',37 'security',38 'user',39 'vfs',40 'vm',41 )42 cmd = 'sysctl -a'43 ret = {}44 out = __salt__['cmd.run'](cmd, output_loglevel='trace', python_shell=False)45 comps = ['']46 for line in out.splitlines():47 # This might need to be converted to a regex, and more, as sysctl output48 # can for some reason contain entries such as:49 #50 # user.tzname_max = 25551 # kern.clockrate: hz = 100, tick = 10000, profhz = 100, stathz = 10052 # kern.clockrate: {hz = 100, tick = 10000, tickadj = 2, profhz = 100,53 # stathz = 100}54 #55 # Yes. That's two `kern.clockrate`.56 #57 if any([line.startswith('{0}.'.format(root)) for root in roots]):58 comps = line.split(': ' if ': ' in line else ' = ', 1)59 if len(comps) == 2:60 ret[comps[0]] = comps[1]61 else:62 ret[comps[0]] = ''63 elif comps[0]:64 ret[comps[0]] += '{0}\n'.format(line)65 else:66 continue67 return ret68def get(name):69 '''70 Return a single sysctl parameter for this minion71 name72 The name of the sysctl value to display.73 CLI Example:74 .. code-block:: bash75 salt '*' sysctl.get hw.physmem76 '''77 cmd = 'sysctl -n {0}'.format(name)78 out = __salt__['cmd.run'](cmd, python_shell=False)79 return out80def assign(name, value):81 '''82 Assign a single sysctl parameter for this minion83 name84 The name of the sysctl value to edit.85 value86 The sysctl value to apply.87 CLI Example:88 .. code-block:: bash89 salt '*' sysctl.assign net.inet.icmp.icmplim 5090 '''91 ret = {}92 cmd = 'sysctl -w {0}="{1}"'.format(name, value)93 data = __salt__['cmd.run_all'](cmd, python_shell=False)94 if data['retcode'] != 0:95 raise CommandExecutionError('sysctl failed: {0}'.format(96 data['stderr']))97 new_name, new_value = data['stdout'].split(':', 1)98 ret[new_name] = new_value.split(' -> ')[-1]99 return ret100def persist(name, value, config='/etc/sysctl.conf', apply_change=False):101 '''102 Assign and persist a simple sysctl parameter for this minion103 name104 The name of the sysctl value to edit.105 value106 The sysctl value to apply.107 config108 The location of the sysctl configuration file.109 apply_change110 Default is False; Default behavior only creates or edits111 the sysctl.conf file. If apply is set to True, the changes are112 applied to the system.113 CLI Example:114 .. code-block:: bash115 salt '*' sysctl.persist net.inet.icmp.icmplim 50116 salt '*' sysctl.persist coretemp_load NO config=/etc/sysctl.conf117 '''118 nlines = []119 edited = False120 value = str(value)121 # If the sysctl.conf is not present, add it122 if not os.path.isfile(config):123 try:124 with salt.utils.files.fopen(config, 'w+') as _fh:125 _fh.write('#\n# Kernel sysctl configuration\n#\n')126 except (IOError, OSError):127 msg = 'Could not write to file: {0}'128 raise CommandExecutionError(msg.format(config))129 with salt.utils.files.fopen(config, 'r') as ifile:130 for line in ifile:131 if not line.startswith('{0}='.format(name)):132 nlines.append(line)133 continue134 else:135 key, rest = line.split('=', 1)136 if rest.startswith('"'):137 _, rest_v, rest = rest.split('"', 2)138 elif rest.startswith('\''):139 _, rest_v, rest = rest.split('\'', 2)140 else:141 rest_v = rest.split()[0]142 rest = rest[len(rest_v):]143 if rest_v == value:144 return 'Already set'145 new_line = '{0}={1}'.format(name, value)146 nlines.append(new_line)147 nlines.append('\n')148 edited = True149 if not edited:150 nlines.append('{0}={1}'.format(name, value))151 nlines.append('\n')152 with salt.utils.files.fopen(config, 'w+') as ofile:153 ofile.writelines(nlines)154 # If apply_change=True, apply edits to system155 if apply_change is True:156 assign(name, value)157 return 'Updated and applied'...

Full Screen

Full Screen

test_document.py

Source:test_document.py Github

copy

Full Screen

...31 # Past end of file32 assert doc.word_at_position({'line': 4, 'character': 0}) == ''33def test_document_empty_edit(workspace):34 doc = Document('file:///uri', workspace, '')35 doc.apply_change({36 'range': {37 'start': {'line': 0, 'character': 0},38 'end': {'line': 0, 'character': 0}39 },40 'text': 'f'41 })42 assert doc.source == 'f'43def test_document_line_edit(workspace):44 doc = Document('file:///uri', workspace, 'itshelloworld')45 doc.apply_change({46 'text': 'goodbye',47 'range': {48 'start': {'line': 0, 'character': 3},49 'end': {'line': 0, 'character': 8}50 }51 })52 assert doc.source == 'itsgoodbyeworld'53def test_document_multiline_edit(workspace):54 old = [55 "def hello(a, b):\n",56 " print a\n",57 " print b\n"58 ]59 doc = Document('file:///uri', workspace, ''.join(old))60 doc.apply_change({'text': 'print a, b', 'range': {61 'start': {'line': 1, 'character': 4},62 'end': {'line': 2, 'character': 11}63 }})64 assert doc.lines == [65 "def hello(a, b):\n",66 " print a, b\n"67 ]68def test_document_end_of_file_edit(workspace):69 old = [70 "print 'a'\n",71 "print 'b'\n"72 ]73 doc = Document('file:///uri', workspace, ''.join(old))74 doc.apply_change({'text': 'o', 'range': {75 'start': {'line': 2, 'character': 0},76 'end': {'line': 2, 'character': 0}77 }})78 assert doc.lines == [79 "print 'a'\n",80 "print 'b'\n",81 "o",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful